Code Monkey home page Code Monkey logo

blog's Introduction

  • I’m the author of Apache Fury(incubating): A blazingly fast multi-language serialization framework powered by JIT and zero-copy. I created Fury in 2019.07, open sourced it in 2023.07, donated it to ASF in 2023.12.
  • I'm the PMC member of Mars: a distributed scientific computing framework.
  • I'm experienced at:
    • distributed frameworks: Spark/Flink/Hadoop/Ray/Dask/Mars/TensorFlow
    • computing patterns: streaming/batch computing, tensor computation, online learning, batch inferrence.
    • computing engines: Arrow/Pandas/Numpy/Polars/Jax/XLA.
  • Currrently I'm focusing on XLA/TVM/TensorRT/Media engine, multimodal computing and inferrence Framework
  • 📫 How to reach me:

blog's People

Stargazers

 avatar

Watchers

 avatar  avatar

blog's Issues

《The Dataflow Model》论文阅读

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in MassiveScale, Unbounded, OutofOrder Data Processing

最近看了Google发表的dataflow论文,其中两点颇有感触:

  1. dataflow提供了一个思考维度:从什么结果被计算、在事件时间的哪里计算、处理时间的什么时候观察到结果、以及早先的结果如何与之后的修正相关。这种思维方式的转变是很重要的。
  2. dataflow分析了lambda架构的**,即实时计算结果不准确但是低延迟,然后在批处理中修正结果,达到最终正确性。它对其推广,在流处理中使用触发器低延时产生结果,这个结果不一定准确,然后在后续计算中,利用晚到的数据产生新的结果来修正之前的结果,从而实现低延迟和正确性,而不是依赖等待数据完整来实现正确性(这会造成延时)

下面是它的主要内容:

A single unified model

  • 允许在无界乱序数据源上,使用各种correctness、latency、cost的combinations和tradeoff,进行事件时间有序结果、按数据自身特征划分窗口进行计算
  • 分解data pipeline为四个维度,提供清晰性、可组合性和灵活性
    • What results are being computed
    • Where in event time they are being computed.
    • When in processing time they are materialized.
    • How earlier results relate to later re nements.
  • 分离数据处理的逻辑与下层的实现,允许基于correctness, latency, and cost选择batch, micro-batch, or streaming engine

Concrete contribution

  • A windowing model which supports unaligned event-time windows, and a simple API for their creation and use.
  • A triggering model that binds the output times of results to runtime characteristics of the pipeline, with a powerful and flexible declarative API for describing desired triggering semantics.
  • An incremental processing model that integrates retractions and updates into the windowing and triggering models described above

Dataflow Model

Core Primitives

  • ParDo
  • GroupByKey

Windowing

  • unaligned windows

  • windowing can be broken apart:

    • AssignWindows
      • Elements are initially assigned to a default global window, covering all of event time, providing semantics that match the defaults in the standard batch model.
      • Since windows are associated directly with the elements to which they belong, this means window assignment can happen anywhere in the pipeline before grouping is applied. This is important, as the grouping operation may be buried somewhere downstream inside a composite transformation
    • MergeWindows
      • All are initially placed in a default global window by the system.
      • Then implementation of AssignWindows puts each element into a single window.
      • DropTimestamps
      • GroupByKey
      • MergeWindows
      • GroupAlsoByWindow
      • ExpandToElements

Triggers & Incremental Processing

  • Triggers are complementary to the windowing model, in that they each affect system behaviour along a different axis of time:

  • Windowing determines where in event time data are grouped together for processing.

  • Triggering determines when in processing time the results of groupings are emitted as panes.

  • Triggers system provides a way to control how multiple panes for the same window relate to each other

  • Discarding

  • Accumulating

  • Accumulating & Retracting

Design Principles

  • Never rely on any notion of completeness.
  • Be flexible, to accommodate the diversity of known use cases, and those to come in the future.
  • Not only make sense, but also add value, in the context of each of the envisioned execution engines.
  • Encourage clarity of implementation.
  • Support robust analysis of data in the context in which they occurred.

Ray Streaming Cross-Language API Example

public class HybridStreamTest {
  public static class Foo {
    int f1; String f2;
    List<String> f3;
    Foo(String s) {}
  }

  public static class Bar {
    int[] arr; String str;
  }

}

public void testHybridDataStream() {
    StreamingContext context = StreamingContext.buildContext();
    context.fromValues("a", "b", "c")
        .map(x -> new Foo(x))
        .asPythonStream()
        .flatMap("demo",  "flat_map_func")
        .asJavaStream(Bar.class)
        .sink(x -> System.out.println("HybridJob: " + x));
    context.execute("HybridJob");
}
def test_hybrid_stream():
    ctx = StreamingContext.Builder().build()
    ctx.from_values("a", "b", "c") \
    	.map(lambda x: Foo(x)) \
        .as_java_stream() \  
        .flat_map("demo.FlatMapper") \
        .as_python_stream() \
        .sink(sink_func)
    ctx.submit("HybridJob")

Apache Flink - Stateful Computations over Data Streams

Apache Flink - Stateful Computations over Data Streams

Document

Apache Flink Documentation

Dataflow Model

The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in MassiveScale, Unbounded, OutofOrder Data Processing

State

Paper

  • Chandy-Lamport algorithm:Distributed Snapshots: Determining Global States of Distributed Systems
  • Lightweight Asynchronous Snapshots for Distributed Dataflows
    • an asynchronous snapshotting algorithm that achieves minimal snapshots on acyclic execution graphs
    • a generalisation of our algorithm that works on cyclic execution graphs
  • State Management in Apache Flink
    • a complete end-to-end design for continuous stateful processing, from the conceptual view of state in the programming model to its physical counterpart implemented in various backends.
    • how to naturally pipeline consistent snapshots in weakly connected dataflow graphs and capture minimal state, skipping in-transit records when possible, without impacting general system progress.
    • how snapshots can be utilized for a large variety of operational needs beyond failure recovery such as software patches, testing, system upgrades and exactly-once delivery.
    • encapsulate different processing guarantees and isolation levels for externally accessing partitioned operator state and output, using snapshots.
    • large-scale pipeline deployments that operate 24/7 in production and rely heavily on stateful processing coupled with runtime metrics and performance insights.

Article

Note

  1. Key-Groups
    • Key-Groups是flink重分发状态的原子单元。
    • 使用一致性哈希算法减少seek和增加磁盘顺序读。
    • 实现:先对key做hash,然后对最大并行度取模。以任务并行度等分哈希环,每个task instance处理[ i * max_parallelism/parallelism, (i + 1) * max_parallelism/parallelism)内的key,管理keyed-state。如果并行度改变,则加载新的范围的状态,新进来的记录也会哈希到持有对应key的状态的task instance
    • 为什么使用一致性哈希:如果不使用一致性哈希,比如使用并行度来取模。那么在并行度改变时,要么任务扫描整个快照,获取指派给它的所有键的状态,这会导致大量不必要的I/O;或者在快照中保存对每个key-state的引用,即索引,这样每个任务来选择性地读取指派给它的状态,这会增加索引成本(与key的数量成比例,而key的数量可能会非常大)和通信负载。使用一致性哈希,则每个task实例只读需要的数据,而且通常Key-Groups足够大以致能够粗粒度地顺序磁盘读。
    • 注意哈希环即最大并行度不能够设置的太大。因为根据上面定义,Key-Groups数量与最大并行度相同,如果最大并行度设置得太大,则每个Key-Groups可能只由很少的key的状态组成,从而无法进行粗粒度磁盘读写,导致性能降低。
  2. Rollback
    • During a full restart or rescaling, all tasks are being redeployed, while after a failure only the tasks belonging to the affected connected component (of the execution graph) are reconfigured.
    • In essence, known incremental recovery techniques from micro-batch processing are orthogonal to this approach and can also be employed. A snapshot epoch acts as synchronization point, similarly to a micro-batch or an input-split. On recovery, new task instances are being scheduled and, upon initialization, retrieve their allocated shard of state. In the case of IterationHead recovery, all records logged during the snapshot are recovered and flushed to output channels prior to their regular record forwarding logic.
  3. Asynchronous and Incremental Snapshots
    • the pipelined snapshotting protocol only governs “when” but not “how” snapshots are internally executed.
    • The out-of-core state backend based on RocksDB [13]exploits the LSM (log-structured merge) tree, internal representation of data in RocksDB. Updates are not made in-place, but are appended and compacted asynchronously. Upon taking a snapshot, the synchronous triggerSnapshot() call simply marks the current version, which prevents all state as of that version to be overwritten during compactions. The operator can then continue processing and make modifications to the state. An asynchronous thread iterates over the marked version, materializes it to the snapshot store, and finally releases the snapshot so that future compactions can overwrite that state. Furthermore, the LSM-based data structure also lends itself to incremental snapshots, which write only parts to the snapshot store that changed since the previous snapshots.
    • Flink’s in-memory local state backend implementation is based on hash tables that employ chain hashing. During a snapshot, it copies the current table array synchronously and then starts the external materialization of the snapshot, in a background thread. The operator’s regular stream processing thread lazily copies the state entries and overflow chains upon modification, if the materialization thread still holds onto the snapshot. Incremental snapshots for the in-memory local backend are possible and conceptually trivial (using delta maps), yet not implemented at the current point.
  4. ExactlyOnce Delivery Sinks
    • Idempotent Sinks
    • Transactional Sinks

Pandas Internals

Deploy java jar to central repository

重点参考 https://central.sonatype.org/pages/apache-maven.html

发布

  1. 注册账号和open ticket
  2. 添加OSSRH-XXXXX到域名文本记录
  3. 创建token
  4. PGP Signatures
    a. install gpg https://gpgtools.org/
    b. create your own key pair
    c. and distribute it to a key server so that users can validate it
  5. change version in a multi module setup: mvn versions:set -DnewVersion=1.2.3
  6. Deploy: mvn -T8 clean deploy -Dmaven.test.skip=true -Dcheckstyle.skip -Prelease

Note

SNAPSHOT versions are not synchronized to the Central Repository. If you wish your users to consume your SNAPSHOT versions,
they would need to add the snapshot repository to their Nexus Repository Manager, settings.xml, or pom.xml.
Successfully deployed SNAPSHOT versions will be found in https://oss.sonatype.org/content/repositories/snapshots/

#参考

  1. http://tutorials.jenkov.com/maven/publish-to-central-maven-repository.html
  2. https://medium.com/@nmauti/publishing-a-project-on-maven-central-8106393db2c3
  3. https://central.sonatype.org/pages/producers.html
  4. maven 发布:https://central.sonatype.org/pages/apache-maven.html
  5. https://github.com/simpligility/ossrh-demo
  6. Publishing a project on Maven Central https://medium.com/@nmauti/publishing-a-project-on-maven-central-8106393db2c3
  7. Requirements https://central.sonatype.org/pages/requirements.html
  8. PGP Signatures https://central.sonatype.org/pages/working-with-pgp-signatures.html
  9. Apache Beam Release Guide https://beam.apache.org/contribute/release-guide/
  10. Creating a Flink Release https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release

Arrow Compute

基本材料

In March of 2021, when major work on the C++ query execution machinery
in Arrow was beginning, Wes sent a message [1] to the dev list and
linked to a doc [2] with some details about the planned design. A few
months later Neal sent an update [3] about this work. However those
documents are now somewhat out of date. More recently, Wes shared
another update [4] and linked to a doc [5] regarding task execution /
control flow / scheduling. However I think the best source of
information is the doc you linked to. The query execution work has
proceeded organically with many contributors, and efforts to document
the overall design in sufficient detail have not kept pace.
[1] https://lists.apache.org/thread/n632pmjnb85o49lyxy45f7sgh4cshoc0
[2] https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/
[3] https://lists.apache.org/thread/3pmb592zmonz86nmmbjcw08j5tcrfzm1
[4] https://lists.apache.org/thread/ltllzpt1r2ch06mv1ngfgdl7wv2tm8xc
[5] https://docs.google.com/document/d/1216CUQZ7u4acZvC2jX7juqqQCXtdXMellk3lRrgP_WY/
[6] https://conbench.ursa.dev/
[7] https://lists.apache.org/thread/7v7vkc005v9343n49b3shvrdn19wdpj1

执行模型

  • (some query engines use a "pull"-based model, in which the data flow is inverted — there are pros and cons to both approaches, see Everything You Always Wanted to Know About Compiled and Vectorized Queries But Were Afraid to Ask)
  • "My personal feeling is that the pull model was good in the early query execution engines, based on processing of a single row at a time and using virtual function calls to switch between relational operators within the query. In my experience, the push model is easier to work with in both modern worlds of query execution: JIT compiled query processing and vectorized query processing." - apache/arrow#9621

Fury Example

import pyfury
import pyarrow as pa

class Bar:
  __slots__= ("f1", "f2")

class Foo:
  __slots__= ("f1", "f2", "f3", "f4, "f5")

bar_schema = pa.schema([
    ("f1", pa.int32()),
    ("f2", pa.utf8()),
])
foo_schema = pa.schema([
    ("f1", pa.int32()),
    ("f2", pa.utf8()),
    ("f3", pa.list_(pa.utf8())),
    ("f4", pa.map_(pa.utf8(), pa.int32())),
    pa.field('f5', pa.struct(bar_schema), metadata={"cls": fury.cls(Bar)}),
], metadata={"cls": fury.cls(Foo)})
encoder = pyfury.encoder(foo_schema)

def write(foo):
  return encoder.to_row(foo).to_bytes()

def read_bytes(data_bytes):
  return encoder.from_row(data_bytes)

Building alpa as an unified tensor engine

Motivation

Deep learning, scientific computing, strategy inference are all built on tensor computing.
For small-scale computing, it can be finished by numpy/jax/pytorch on a single process and the usability and performance won’t be an issue.

But when it comes to large-scale tensor computing such as LLM big model, big strategy and huge matrix, there is no solution which can meet usability, performance, auto parallel simultaneously:

  • Dask/Mars support auto parallel numpy tensor computation, but the performance is poor for following reasons:

    • the planner doesn’t support algebra simplification and operator reorder, there will be many extra communication;
    • The execution use numpy which doesn’t loop fusion and other jit optimization, execution will be slow;
    • GPU support is poor, collective communication is not supported or no computation can be push to collective communication;
  • Dask/Mars doesn’t support auto diff, the perf optimization on those framework can’t be used for speed up model training;

  • Magtron-LM has pretty good performance but it’s constrained only for LLM, and not suitable for general tensor computation such as linear algebra;

  • Pytorch/Jax has tensor API, but doesn’t support auto parallel;

If we take a deep look into those tensor computation, we can find they all embody an common pattern requirements:

  • Usability: Support numpy-style API and single device programming idioms, so the users won’t have any learning and migration cost;
  • Auto parallel: with just a few lines of code changes such as alpa.parallelize, dask.compute, mars.execute, the existing can be running on a distributed cluster automatically;
  • High performance requirements:
  • strategy/dl inference both need to be finished in seconds for single sample;
  • Huge tensor computing are issued by users in the notebook and needs to be finished in seconds too;
  • High performance techs: to achieve better performance, we should use
    • Plan: fusion on plan to reduce communication; operator reorder and algebra simplification to reduce computation and communication;
    • Commutation: nccl/gloo communication, rdma
    • Execution: GPU & Vectorization to speedup execution; JIT to reduce kernel calls and loop fusion for better cache hit.

Only a framework that meets above all requirements can be selected as the engine. And since those computations are so similar, we should build an unified tensor engine for those DL/Strategy/Tensor cases instead of building one for each individually. So the optimization can be shared across all cases.

Alpa is designed initially for training and serving large models automatically, but since its abstraction are built on jax computational graph and XLA HLO, all jax computation can be parallelized by alpa automatically, it give alpa same usability as jax/numpy. And alpa uses XLA for execution speedup and supports heterogeneous GPU schedules, which give it pretty good performance. We think it has the potential to grow as an unified distributed engine for any tensor computation:

lQLPJx4a6BahqGLNAv7NBkCwMHrWtiVk4TIENX1Rm0AKAA_1600_766

GCC upgrade

sudo yum install -y alios7u-2_32-gcc-10-repo.noarch
# gcc 9
sudo yum install -y alios7u-2_30-gcc-9-repo.noarch

sudo yum update -y gcc
sudo yum install -y gcc-c++
# 这个glibc正常安装有报错,--skip-broken 加了后能解决,但是具体原因不知。
sudo yum install -y glibc --skip-broken    
这些依赖记得装一下,一把装了莫名其妙的问题少点。
sudo yum install -y binutils zlib-devel bzip2 bzip2-devel readline-devel sqlite sqlite-devel openssl-devel tk-devel libffi-devel xz-devel glibc-locale-source glibc-langpack-en patch


sudo yum -y install glibc-locale-source glibc-langpack-en

# 常用yum命令
yum search gcc
yum --showduplicates list glibc | expand
sudo yum update glibc
ldd --version


# 从GCC6的机器升级到GCC9需要重装python, 如原来的python为3.6.7
# pyenv install 3.6.7

PySpark Notebook Setup Guide

PySpark Notebook Setup Guide

Install Python

Install Anaconda

Configure environment

conda create -n bigdata python=3.6 anaconda
source activate bigdata
# useful to have nice tables of contents in the notebooks, but they are not required.
conda install -n bigdata -c conda-forge jupyter_contrib_nbextensions
# If you want to use the Jupyter extensions (optional, they are mainly useful to have nice tables of contents), you first need to install them:
jupyter contrib nbextension install --user
# Then you can activate an extension, such as the Table of Contents (2) extension:
jupyter nbextension enable toc2/main
# Okay! You can now start Jupyter, simply type:
jupyter notebook

Note: you can also visit http://localhost:8888/nbextensions to activate and configure Jupyter extensions.

Configure jupyter

# configure jupyter and prompt for password
jupyter notebook --generate-config
jupass=`python -c "from notebook.auth import passwd; print(passwd())"`
echo "c.NotebookApp.password = u'"$jupass"'" >> $HOME/.jupyter/jupyter_notebook_config.py
echo "c.NotebookApp.ip = '*'
c.NotebookApp.open_browser = False" >> $HOME/.jupyter/jupyter_notebook_config.py

PySpark

There are two ways to get PySpark available in a Jupyter Notebook:

  1. Configure PySpark driver to use Jupyter Notebook: running pyspark will automatically open a Jupyter Notebook

    source activate bigdata
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
    # specify python used by spark cluster
    export PYSPARK_PYTHON=`which python`
    pyspark --master yarn
  2. Load a regular Jupyter Notebook and load PySpark using findSpark package

    use findSpark package to make a Spark Context available in your code.

    • Install findspark
    pip install findspark
    • Start jupyter notebook

      jupyter notebook
    • Run following code in notebook

      import findspark
      findspark.init() # or findspark.init('/path/to/spark_home')
      
      import pyspark
      sc = pyspark.SparkContext(appName="myAppName")

Reference

https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f

Tensor computation

image

import alpa
import numpy as np
import jax.numpy as jnp

@alpa.parallelize
def tensor_compute(tensor):
    t_transposed = jnp.transpose(tensor)
    dot_matrix = jnp.dot(tensor, t_transposed)
    v1_row_norm = jnp.linalg.norm(tensor, axis=1).reshape(-1, 1)
    v2_col_norm = jnp.linalg.norm(t_transposed, axis=0).reshape(1, -1)
    norm_matrix = jnp.dot(v1_row_norm, v2_col_norm)
    res = dot_matrix / norm_matrix
    res = jnp.where(jnp.isneginf(res), 0, res)
    return res

tensor_compute(np.random.rand(1000000, 100)).block_until_ready()

Photon: A Fast Query Engine for Lakehouse Systems

论文地址:https://www-cs.stanford.edu/~matei/papers/2022/sigmod_photon.pdf

摘要

Photon 是一个针对Lakehouse场景的向量化查询引擎。性能比云数据仓库好,同时本身也能够高效处理原始数据,以及支持Apache Spark API。对于某些work load性能提升十倍,以及新的100TB TPC-DS benchmark记录

设计选择:vectorization vs. code generation) 这个也可以参考论文:https://www.vldb.org/pvldb/vol11/p2209-kersten.pdf 这篇论文也在Photon的Reference里面提到了

1 介绍

Designing Photon required tackling two key challenges.

  • First, unlike a traditional data warehouse engine, Photon needed to perform well on raw, uncurated data, which can include highly irregular datasets, poor physical layout, and large fields, all with no useful clustering or data statistics.
  • Second, we wanted Photon to support, and be semantically compatible with, the existing Apache Spark DataFrame API that is widely used for data lake workloads.

Challenge 1: Supporting raw, uncurated data.
Photon使用向量化的设计,主要有运行时自适应执行的好处:
Vectorized execution enabled us to support runtime adaptivity, wherein Photon discovers, maintains, and exploits micro-batch data characteristics with specialized code paths to adapt to the properties of Lakehouse datasets for optimal performance. For example, Photon runs optimized per-batch code for columns that rarely have NULL values, or mostly-ASCII string data.

Challenge 2: Supporting existing Spark APIs.

2 BACKGROUND

2.1 Databricks’ Lakehouse Architecture

  • Data Lake Storage: decouples data storage from compute。Databricks accesses customer data using connectors
    between a compute service and the data lake. The data itself is stored in open file formats such as Apache Parquet。
  • Automatic Data Management: an open source ACID table storage layer over cloud object stores. Delta Lake enables warehouse-style features such as ACID transactions, time travel, audit logging, and fast metadata operations over tabular datasets. Delta Lake stores both its data and metadata as Parquet.
  • Elastic Execution Layer:Spark + Photon
    image
    Photon fits into this execution layer by handling single-threaded query execution on each partition of the data processed.

2.2 The Databricks Runtime

A query plan is a tree of SQL operators (e.g., Filter, Project, Shuffle) that maps to a list of stages. After query planning, the driver launches tasks to execute each stage of the query. Each task runs uses the in-memory execution engine to process data. Photon is an example of such an execution engine; it supersedes the previous engine based on Apache Spark SQL.

3 EXECUTION ENGINE DESIGN DECISIONS

3.1 Overview

  • Photon使用C++实现,编译成共享库,通过JNI被java executor调用。 Photon runs as part of a single-threaded task in DBR, within an executor’s JVM process.
  • Photon structures a SQL query as as tree of operators, where each operator uses a HasNext()/GetNext() API to pull batches of data from its child. 与Java算子交互也是通过JNI相互拉数据。
  • Photon操作在列式数据格式上面,使用向量化执行进行加速,没有使用codegen

3.2 JVM vs. Native Execution

为什么采用C++实现?

  • IO开销已经显著降低难以继续优化,CPU负载需要进一步优化:
    • local NVMe SSD caching
    • auto optimized shuffle(Spark AQE)
    • Delta Lake提供的数据聚类等技术也可以通过 file pruning等方式跳过不必要的文件也可以进一步减少IO等待时间
  • Lakehouse本身引入了对非标准数据较重的数据处理,比如大字符串,非结构化嵌套数据等。这也导致需要优化执行性能
  • 而继续优化JVM执行引擎的性能非常困难:
    • 需要大量JVM内核细节,确保JIT编译器生成最优代码(比如使用SIMD的循环)。在DataBricks里面只有搞过JVM内核的人才能更新生成Java代码相关的逻辑
    • 缺少控制memory pipelining and custom SIMD kernels,也导致了性能天花板
    • JVM在heap大于64GB时GC性能受到严重影响,而64GB在现代云环境里面是相当小的内存容量。这导致需要手动在JVM上面管理堆外内存。相比于native c++实现,这不容易实现也不好维护
    • 另外Java代码生成也受到方法体大小涸代码缓存限制。比如超过8000个字节码的Java方法不会被编译,超过325个字节码的方法不会被内联。这些限制可能导致回退到Volcano-style的解释器执行模型,严重影响性能。比如对于超过100列的宽表,就经常面临这些限制。

3.3 Interpreted Vectorization vs. Code-Gen

高性能查询引擎一般两种设计:

  • 向量化:clickhouse、MonetDB、X100。动态分发选择代码路径,按batch向量化执行摊还虚方法调用成本
  • 代码生成:impala(LLVM)、spark sql(Catalyst+Janino+Java JIT)、HyPer(Rust实现)。通过整阶段代码生成绕过所有虚方法调用

Photon采用的是向量化技术,原因:

  • Easier to develop and scale。代码生成更难开发和debug,运行时需要植入额外代码来帮助观察行为和debug
  • Observability is easier。代码生成通过方法合并和内联消除了解释和函数调用开销,性能虽好,但引入了观察性问题。因为算子代码被融合成了一次处理一行的代码,因此给每个算子单独添加metrics比较困难。而向量化保留了算子之间抽象的边界,通过按批向量化执行摊还成本,保证了每个算子都可以低成本维护自己的metrics。
  • Easier to adapt to changing data。Photon可以采集每批粒度上采集数据的特性,然后动态选择最佳codepath。这对于Lakehouse工作负载特别重要,因为传统的数仓约束和统计信息在Lakehouse查询case下可能并不存在,同时动态分派执行路径本身就已经是解释化执行的一部分了。代码生成要做同样的事情就需要提前生成所有分支的代码或者重编译部分查询,即使可行也会影响查询时间、启动时间、内存开销。
  • Specialization is still possible. 代码生成可以通过公共子表达式消除和列裁剪较低开销,但通过为向量化执行在常见case(比如col >= left
    and col <= right)创建特殊的融合算子,这类性能gap可以避免掉。由于避免了codegen的复杂性,开发工程师也有更多的时间来做这些优化。

3.4 Row vs. Column-Oriented Execution

Photon采用列存格式,而不是Spark SQL的行存格式。列存有几个特点:

  • 更适合SIMD向量化指令。
  • 通过让算子实现为紧密的循环,能够进行更加有效的数据流水线和数据预取优化。
  • 同时在数据序列化和spill时也更加高效。
  • 另外在Lakehouse里面主要文件格式是列存文件格式Parquet。列存格式可以跳过耗时的行列转换。
  • 同时在列式格式当中还可以支持字典编码来减少内存开销,这对于String和其它变成数据特别有用,同时字符串在LakeHouse里面也很常见。
  • 最终列式计算引擎计算结果写列存也更加容易。

不过Photon在某些场景里面还是会把列存转换为行存。比如在Hash表里面把数据缓存为行存结构。因为当执行hash表的key比较时,存储为列式格式需要大量随机访问列存,对缓存非常不友好。

3.5 Partial Rollout

用Photon执行部分查询,对于不支持的特性,优雅回退到Spark的JVM算子。

4 VECTORIZED EXECUTION IN PHOTON

4.1 Batched Columnar Data Layout

  • column vector:单列持有单批值。column vector也持有一个byte vector,来表示每个值是否是NULL,同时也可以持有batch-level的元数据,比如string编码。(这块跟Arrow vector基本上是一样的)
  • column batch:a collection of column vectors, and represents rows。这个跟Arrow的RecordBatch基本上一样的。但是column batch也持有一个position list来表示那些active的行。Arrow没有这个设计。
    尽管将可以把active vs. inactive存储为一个byte vector,然后对SIMD更加友好。但这样在稀疏的batch上面也需要遍历所有行。实验表明在多数情况下只会使性能更差,即使是最简单的查询。另外数据在算子之间也是按照column batch进行传递的。每个算子从上游接收一个column batch,然后产出一个或者多个column batch。
    image

4.2 Vectorized Execution Kernels

Photon的列式执行是构建在execution kernels之上的。execution kernels是高度优化的处理一个或者多个向量数据的循环的函数。Arrow Compute也采用了这种设计,实现了大量kernels函数。基本上Photon的所有操作比如expressions、hash table probing(TODO 进一步了解以及对比Arrow的hash table)、serialization以及runtime statistics
calculation都是实现为kernels。kernels多数情况下依赖编译器自动向量化,同时Photon也会在输入参数上提供一些RESTRICT标记提示编译器自动向量化。有时候Photon也会手写SIMD指令来实现向量化。通过C++模板将Kernels函数具化,以支持不同输入类型。每个Kernel函数接收Vectors以及column batch position list作为输入,然后产出一个vector作为输出。
下面是一个计算平方根的kernel示例:

template <bool kHasNulls, bool kAllRowsActive>
void SquareRootKernel(const int16_t* RESTRICT pos_list,
  int num_rows, const double* RESTRICT input,
  const int8_t* RESTRICT nulls, double* RESTRICT result) {
  for (int i = 0; i < num_rows; i++) {
    // branch compiles away since condition is
    // compile-time constant.
    int row_idx = kAllRowsActive ? i : pos_list[i];
    if (!kHasNulls || !nulls[row_idx]) {
      result[row_idx] = sqrt(input[row_idx]);
    }
  }
}

4.3 Filters and Conditionals

  • Filters: 通过修改column batch的position list实现。一个filtering表达式接收column vectors作为输入,返回一个position list作为输入,然后产出一个vector作为输出。
  • Conditionals(待讨论):对于CASE WHEN场景,对于每个CASE分支,在kernel调用内部修改position list保证只让一些行active。然后多个CASE分支,每个都返回单独的position list。在一个position list里面inactive的row可能在别的position list里面是active的,因此不能修改inactive的行。

4.4 Vectorized Hash Table

Photon’s hash table是一个vectorized实现,主要分为三步:

  • 首先使用hashing kernel给一批key计算hash
  • 接下来probe kernel使用hash值加载hash table entries的指针,注意hash table entries的存储为行而不是列的
  • 最终将hash table entries跟查询的key进行column-by-column的比较,然后为不匹配的行产出一个position list。不匹配的行通过调整bucket index继续probing hash table(Photon用的是quadratic probing)。

其实就是把Hash表的实现并行化了一下,把hash计算,bucket查找,
这块跟Arrow Compute的Swiss Table很像:https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/exec/doc/key_map.md .个人感觉Arrow的Swiss Table是一个更优的设计

4.5 Vector Memory Management

Photon使用了一个自己写的buffer pool来管理内存,buffer pool会cache分配,同时使用最近使用的内存来分配内存。这样可以将热点内存用于每个input batch的重复分配。因为query算子在执行期间固定,因此如果输入batch是固定长度类型,处理单个input batch的vector分配数量就也是固定的。

变长数据(e.g., buffers for strings)是使用一个append-only pool单独管理的,Photon会在处理新的batch前释放这个buffer pool。这个pool使用的内存是通过一个全局的memory tracker来记录的,这样引擎就可以在遇到很大的字符串时调小batch size。

对于超过单个batch生命周期的内存分配比如聚合和join,这些内存是被一个外部的内存管理器单独追踪的。这种细粒度的内存管理比Spark SQL更加能够健壮处理大输入记录。

4.6 Adaptive Execution

在LakeHouse场景里面,要处理的数据大部分都缺少元数据、统计信息,而且大部分数据本身也不标准。因此很难在执行计划阶段选择出最优的执行计划。因此Photon提供了batch-level的自适应执行能力,即在运行时对一个batch的数据构建元数据,然后使用该元数据来优化execution kernel的选择。

主要的自适应能力包括两部分:
— 每个batch是否有Null值。没有空值的话可以移除分支。

  • 每个batch是否有inactive的行。没有inactive行的话可以避免position list的indirection,这也会促进SIMD

其它情况下也会进行一些case by case的优化:

  • 所有字符串都是ASCII时可以使用定长的Vector;
  • 运行时压缩稀疏的batch
  • 自适应的Shuffle编码等,通过寻找Shuffle Key的pattern,从而使用更加高效的编码

5 INTEGRATION WITH DATABRICKS RUNTIME

Photon目前主要是作为Spark的native算子集成到spark的,因此Photon是Spark和Spark共享集群资源

5.1 Converting Spark Plans to Photon Plans

Spark SQL的执行计划转到Photon的执行计划,是通过给Spark的catalyst优化器增加一组规则来把部分执行计划节点换成Photon来实现的。
image
添加的规则首先从自底向上从file scan开始遍历,把每个Photon支持的节点替换成Photon。如果某个节点Photon不支持,就插入一个转换节点把Photon列式数据转换成Spark SQL的行存数据。同时也在file scan和第一个Photon之间插入一个适配器节点来把file scan的数据转换成Photon的列式存储。因为file scan一般读取的parquet的列式数据,因此这一步一般是Zero-Copy的。

5.2 Executing Photon Plans

在一个Photon task里面,executor首先把local执行计划的Photon部分序列化成Protobuf格式,然后通过JNI传递给Photon C++ library,Photon再把它反序列化和转换成Photon内部的执行计划。执行计划跟Spark很像,每个算子都有HasNext()/GetNext()接口,数据是从末端节点(parent)向开始节点(child)按照column batche的粒度拉取的方式进行流转。

对于数据交换部分,Photon会复合Spark Shuffle协议的文件,然后把Shuffle文件的元数据传递给Spark来执行Shuffle的网络IO过程。然后在接收端(新的stage)Photon会从Shuffle里面读取对应的分区。

Adapter node to read Scan data:Photon本地执行计划的叶子节点始终是adapter节点。scan node把数据首先读取到OffHeapColumnVector,然后把Photon存储数据和NULL的Vector指向OffHeapColumnVector对应的address即可,整个过程zero-Copy。

Transition node to pass Photon data to Spark
Photon plan的最后一个节点是一个过渡节点,负责把列式数据转换为Spark SQL需要的行存格式。因为最开始读数据就是列式,因此Photon只做了一次行列转换。而Spark SQL本身也会在最开始读数据时执行一次转换,因此Photon并没有引入额外开销。后续Photon可能会考虑在中间节点引入Photon算子,这将引入额外的行列转换开销,需要进行tradeoff。

5.3 Unified Memory Management

因为Photon是作为Spark Task在Spark Executor里面运行的,因此Photon和Spark Executor共享所有集群资源,两者必须有一致的资源视图。因此Photon hook了Spark的内存管理器。主要是通过将内存保留和内存分配分开来实现的,这块跟Arrow Java的内存管理就非常像,不确定是不是参考了Arrow。

内存reservations会向Spark请求内存,这可能会导致Spill,这时Spark会要求使用内存的算子(memory consumer)释放内存。Spark和Photon算子都可以请求和释放内存。Spilling设计为动态,主要是因为通常不知道每个算子将消费多少数据和内存,尤其是在UDF时。

Spill策略跟Spark一样,将内存消费者按分配的内存从低到高排序,然后将第一个满足要spill大小的算子进行spill,这样可以最小化spill的数量和数据。

在保留内存后,Photon就可以分配内存了,分配内存是纯local的操作,而不是从Spark的内存池划一块内存出来,Spark只是负责统计内存。这样Spill算子执行将分为两个阶段:保留阶段请求内存,可能会导致别的算子spill让出内存;分配阶段产出中间计算结果。这对于Lakehouse很关键,因为查询经常超过可用内存。

5.4 Managing On-heap vs. Off-heap memory

Spark集群的每个Executor都配置了一个静态的off-heap大小,Spark内存管理器负责这块内存的管理,每个内存消费者需要确保自己只使用分配的内存,过度使用可能导致OOM。但只提供内存并不足够,由于JVM通常只在堆内存使用较高时才触发老年代GC,而Photon使用的内存都是堆外内存,因此GC很少发生。当Photon依赖heap内存时就可能出现问题,比如在执行broadcast hash join时候需要把数据从Photon堆外内存拷贝到堆内存。但是这个临时的内存使用并不会被频繁GC,因此当其它Spark JVM算子分配大块内存时可能导致Java OutOfMemory。Photon通过添加一个监听器在查询结束时清理Photon状态避免了双份内存占用的问题。

5.5 Interaction with Other SQL Features

Photon算子实现了导出统计信息(shuffle文件大小,输出数据行数等)的接口,这些信息可以提供给AQE在stage边界进行重分区和执行计划调整。
Photon也和Spark的metric系统做了集成,可以通过Spark UI查看metrics。

5.6 Ensuring Semantics Consistency

因为相同的query表达式可以同时运行在Photon和Spark里面,因此两者的行为必须保持一致。Java和C++的integer-to-floating转换在某些情况下行为不一致,以及时区数据库的版本不一致都可能导致返回不同的结果。因此Photon增加了三类测试捞保证行为的一致性:

  • Unit tests: 直接对SQL表达式进行测试。C++层实现了一个表达式测试框架,同时也和Spark的表达式测试case进行了集成。
  • End-to-end tests: 提交使用Photon和Spark的查询,然后比较结果
  • Fuzz tests: 随机生成输入数据和Query,然后同时使用Photon和Spark执行并比较结果

6 EXPERIMENTAL EVALUATION

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.