Code Monkey home page Code Monkey logo

incubator-uniffle's Introduction

Apache Uniffle (Incubating)

Uniffle is a high performance, general purpose remote shuffle service for distributed computing engines. It provides the ability to push shuffle data into centralized storage service, changing the shuffle style from "local file pull-like style" to "remote block push-like style". It brings in several advantages like supporting disaggregated storage deployment, super large shuffle jobs, and high elasticity. Currently it supports Apache Spark, Apache Hadoop MapReduce and Apache Tez.

Build Codecov License Release Slack

Architecture

Rss Architecture Uniffle cluster consists of three components, a coordinator cluster, a shuffle server cluster and an optional remote storage (e.g., HDFS).

Coordinator will collect the status of shuffle servers and assign jobs based on some strategy.

Shuffle server will receive the shuffle data, merge them and write to storage.

Depending on different situations, Uniffle supports Memory & Local, Memory & Remote Storage(e.g., HDFS), Memory & Local & Remote Storage(recommendation for production environment).

Shuffle Process with Uniffle

  • Spark driver ask coordinator to get shuffle server for shuffle process
  • Spark task write shuffle data to shuffle server with following step: Rss Shuffle_Write
  1. Send KV data to buffer
  2. Flush buffer to queue when buffer is full or buffer manager is full
  3. Thread pool get data from queue
  4. Request memory from shuffle server first and send the shuffle data
  5. Shuffle server cache data in memory first and flush to queue when buffer manager is full
  6. Thread pool get data from queue
  7. Write data to storage with index file and data file
  8. After write data, task report all blockId to shuffle server, this step is used for data validation later
  9. Store taskAttemptId in MapStatus to support Spark speculation
  • Depending on different storage types, the spark task will read shuffle data from shuffle server or remote storage or both of them.

Shuffle file format

The shuffle data is stored with index file and data file. Data file has all blocks for a specific partition and the index file has metadata for every block.

Rss Shuffle_Write

Supported Spark Version

Currently supports Spark 2.3.x, Spark 2.4.x, Spark 3.0.x, Spark 3.1.x, Spark 3.2.x, Spark 3.3.x, Spark 3.4.x, Spark 3.5.x

Note: To support dynamic allocation, the patch(which is included in patch/spark folder) should be applied to Spark

Supported MapReduce Version

Currently supports the MapReduce framework of Hadoop 2.8.5, Hadoop 3.2.1

Building Uniffle

note: currently Uniffle requires JDK 1.8 to build, adding later JDK support is on our roadmap.

Uniffle is built using Apache Maven. To build it, run:

./mvnw -DskipTests clean package

To fix code style issues, run:

./mvnw spotless:apply -Pspark3 -Pspark2 -Ptez -Pmr -Phadoop2.8 -Pdashboard

Build against profile Spark 2 (2.4.6)

./mvnw -DskipTests clean package -Pspark2

Build against profile Spark 3 (3.1.2)

./mvnw -DskipTests clean package -Pspark3

Build against Spark 3.2.x, Except 3.2.0

./mvnw -DskipTests clean package -Pspark3.2

Build against Spark 3.2.0

./mvnw -DskipTests clean package -Pspark3.2.0

Build against Hadoop MapReduce 2.8.5

./mvnw -DskipTests clean package -Pmr,hadoop2.8

Build against Hadoop MapReduce 3.2.1

./mvnw -DskipTests clean package -Pmr,hadoop3.2

Build against Tez 0.9.1

./mvnw -DskipTests clean package -Ptez

Build against Tez 0.9.1 and Hadoop 3.2.1

./mvnw -DskipTests clean package -Ptez,hadoop3.2

Build with dashboard

./mvnw -DskipTests clean package -Pdashboard

To package the Uniffle, run:

./build_distribution.sh

Package against Spark 3.2.x, Except 3.2.0, run:

./build_distribution.sh --spark3-profile 'spark3.2'

Package against Spark 3.2.0, run:

./build_distribution.sh --spark3-profile 'spark3.2.0'

Package will build against Hadoop 2.8.5 in default. If you want to build package against Hadoop 3.2.1, run:

./build_distribution.sh --hadoop-profile 'hadoop3.2'

Package with hadoop jars, If you want to build package against Hadoop 3.2.1, run:

./build_distribution.sh --hadoop-profile 'hadoop3.2' -Phadoop-dependencies-included

rss-xxx.tgz will be generated for deployment

Deploy

If you have packaged tgz with hadoop jars, the env of HADOOP_HOME is needn't specified in rss-env.sh.

Deploy Coordinator

  1. unzip package to RSS_HOME
  2. update RSS_HOME/bin/rss-env.sh, e.g.,
      JAVA_HOME=<java_home>
      HADOOP_HOME=<hadoop home>
      XMX_SIZE="16g"
    
  3. update RSS_HOME/conf/coordinator.conf, e.g.,
      rss.rpc.server.port 19999
      rss.jetty.http.port 19998
      rss.coordinator.server.heartbeat.timeout 30000
      rss.coordinator.app.expired 60000
      rss.coordinator.shuffle.nodes.max 5
      # enable dynamicClientConf, and coordinator will be responsible for most of client conf
      rss.coordinator.dynamicClientConf.enabled true
      # config the path of client conf
      rss.coordinator.dynamicClientConf.path <RSS_HOME>/conf/dynamic_client.conf
      # config the path of excluded shuffle server
      rss.coordinator.exclude.nodes.file.path <RSS_HOME>/conf/exclude_nodes
    
  4. update <RSS_HOME>/conf/dynamic_client.conf, rss client will get default conf from coordinator e.g.,
     # MEMORY_LOCALFILE_HDFS is recommended for production environment
     rss.storage.type MEMORY_LOCALFILE_HDFS
     # multiple remote storages are supported, and client will get assignment from coordinator
     rss.coordinator.remote.storage.path hdfs://cluster1/path,hdfs://cluster2/path
     rss.writer.require.memory.retryMax 1200
     rss.client.retry.max 50
     rss.writer.send.check.timeout 600000
     rss.client.read.buffer.size 14m
    
  5. start Coordinator
     bash RSS_HOME/bin/start-coordnator.sh
    

Deploy Shuffle Server

We recommend to use JDK 11+ if we want to have better performance when we deploy the shuffle server. Some benchmark tests among different JDK is as below: (using spark to write shuffle data with 20 executors. Single executor will total write 1G, and each time write 14M. Shuffle Server use GRPC to transfer data)

Java version ShuffleServer GC Max pause time ThroughOutput
8 G1 30s 0.3
11 G1 2.5s 0.8
18 G1 2.5s 0.8
18 ZGC 0.2ms 0.99997

Deploy Steps:

  1. unzip package to RSS_HOME
  2. update RSS_HOME/bin/rss-env.sh, e.g.,
      JAVA_HOME=<java_home>
      HADOOP_HOME=<hadoop home>
      XMX_SIZE="80g"
    
  3. update RSS_HOME/conf/server.conf, e.g.,
      rss.rpc.server.port 19999
      rss.jetty.http.port 19998
      rss.rpc.executor.size 2000
      # it should be configured the same as in coordinator
      rss.storage.type MEMORY_LOCALFILE_HDFS
      rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
      # local storage path for shuffle server
      rss.storage.basePath /data1/rssdata,/data2/rssdata....
      # it's better to config thread num according to local disk num
      rss.server.flush.thread.alive 5
      rss.server.flush.localfile.threadPool.size 10
      rss.server.flush.hadoop.threadPool.size 60
      rss.server.buffer.capacity 40g
      rss.server.read.buffer.capacity 20g
      rss.server.heartbeat.interval 10000
      rss.rpc.message.max.size 1073741824
      rss.server.preAllocation.expired 120000
      rss.server.commit.timeout 600000
      rss.server.app.expired.withoutHeartbeat 120000
      # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m
      # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS
      # please set a proper value if DFS is used, e.g., 64m, 128m.
      rss.server.flush.cold.storage.threshold.size 100g
    
  4. start Shuffle Server
     bash RSS_HOME/bin/start-shuffle-server.sh
    

Deploy Spark Client

  1. Add client jar to Spark classpath, e.g., SPARK_HOME/jars/

    The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-spark2-shaded-${version}.jar

    The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-spark3-shaded-${version}.jar

  2. Update Spark conf to enable Uniffle, e.g.,

    # Uniffle transmits serialized shuffle data over network, therefore a serializer that supports relocation of
    # serialized object should be used. 
    spark.serializer org.apache.spark.serializer.KryoSerializer # this could also be in the spark-defaults.conf
    spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
    spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
    # Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
    

Support Spark dynamic allocation

To support spark dynamic allocation with Uniffle, spark code should be updated. There are 7 patches for spark (2.3.4/2.4.6/3.0.1/3.1.2/3.2.1/3.3.1/3.4.1) in patch/spark folder for reference.

After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:

spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true

For spark3.5 or above just add one more configuration:

spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo

Deploy MapReduce Client

  1. Add client jar to the classpath of each NodeManager, e.g., /share/hadoop/mapreduce/

The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar

  1. Update MapReduce conf to enable Uniffle, e.g.,

    -Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
    -Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
    -Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
    -Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffle
    

Note that the RssMRAppMaster will automatically disable slow start (i.e., mapreduce.job.reduce.slowstart.completedmaps=1) and job recovery (i.e., yarn.app.mapreduce.am.job.recovery.enable=false)

Deploy Tez Client

  1. Append client jar to pacakge which is set by 'tez.lib.uris'.

In production mode, you can append client jar (rss-client-tez-XXXXX-shaded.jar) to package which is set by 'tez.lib.uris'.

In development mode, you can append client jar (rss-client-tez-XXXXX-shaded.jar) to HADOOP_CLASSPATH.

  1. Update tez-site.xml to enable Uniffle.
Property Name Default Description
tez.am.launch.cmd-opts -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC org.apache.tez.dag.app.RssDAGAppMaster enable remote shuffle service
tez.rss.coordinator.quorum coordinatorIp1:19999,coordinatorIp2:19999 coordinator address

Note that the RssDAGAppMaster will automatically disable slow start (i.e., tez.shuffle-vertex-manager.min-src-fraction=1, tez.shuffle-vertex-manager.max-src-fraction=1).

Deploy In Kubernetes

We have provided an operator for deploying uniffle in kubernetes environments.

For details, see the following document:

operator docs

Configuration

The important configuration is listed as follows.

Role Link
coordinator Uniffle Coordinator Guide
shuffle server Uniffle Shuffle Server Guide
client Uniffle Shuffle Client Guide

Security: Hadoop kerberos authentication

The primary goals of the Uniffle Kerberos security are:

  1. to enable secure data access for coordinator/shuffle-servers, like dynamic conf/exclude-node files stored in secured dfs cluster
  2. to write shuffle data to kerberos secured dfs cluster for shuffle-servers.

The following security configurations are introduced.

Property Name Default Description
rss.security.hadoop.kerberos.enable false Whether enable access secured hadoop cluster
rss.security.hadoop.kerberos.krb5-conf.file - The file path of krb5.conf. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.keytab.file - The kerberos keytab file path. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.principal - The kerberos keytab principal. And only when rss.security.hadoop.kerberos.enable is enabled, the option will be valid
rss.security.hadoop.kerberos.relogin.interval.sec 60 The kerberos authentication relogin interval. unit: sec
rss.security.hadoop.kerberos.proxy.user.enable true Whether using proxy user for job user to access secured Hadoop cluster.
  • The proxy user mechanism is used to keep the data isolation in uniffle, which means the shuffle-data written by shuffle-servers is owned by spark app's user. To achieve the this, the login user specified by above config should be as the superuser for HDFS. For more details of related sections, please see Proxy user - Superusers Acting On Behalf Of Other Users

Benchmark

We provide some benchmark tests for Uniffle. For details, you can see Uniffle 0.2.0 Benchmark, Uniffle 0.9.0 Benchmark.

LICENSE

Uniffle is under the Apache License Version 2.0. See the LICENSE file for details.

Contributing

For more information about contributing issues or pull requests, see Uniffle Contributing Guide.

incubator-uniffle's People

Contributors

advancedxy avatar bin41215 avatar cchung100m avatar dependabot[bot] avatar dingshun3016 avatar duanmeng avatar enricomi avatar frankliee avatar izchen avatar jerqi avatar jerryshao avatar jiafuzha avatar kaijchen avatar leixm avatar lifeso avatar qijiale76 avatar rickyma avatar slfan1989 avatar smallzhongfeng avatar summaryzb avatar wangao1236 avatar wenlongbrother avatar wforget avatar xianjingfeng avatar xumanbu avatar yl09099 avatar zhengchenyu avatar zhuyaogai avatar zuston avatar zwangsheng avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

incubator-uniffle's Issues

Support lower Hadoop versions in client-mr

Currently, uniffle use the default Hadoop version of 2.8.5.

When using the ./build_distribution.sh --spark2-profile 'spark2' --spark3-mvn '-Dspark.version=2.4.3' --spark3-profile 'spark3' --spark3-mvn '-Dspark.version=3.1.1' -Dhadoop.version=2.6.0, it will throw exceptions due to some methods and vars not supported in Hadoop 2.6.0.

Some non-compatible params and methods as follows

  1. CallContext, introduced by >= 2.8.0.
  2. MRJobConfig.DEFAULT_SHUFFLE_MERGE_PERCENT introduced by 2.8.0. ticket link
  3. MRApps.getSystemPropertiesToLog introduced by 2.8.0 ticket link

I think we could use the reflection to be compatible with lower hadoop version.

[Improvement] Support more tasks of the application

The current blockId is designed as following:

 // BlockId is long and composed by partitionId, executorId and AtomicInteger
 // AtomicInteger is first 19 bit, max value is 2^19 - 1
 // partitionId is next 24 bit, max value is 2^24 - 1
 // taskAttemptId is rest of 20 bit, max value is 2^20 - 1

Why we need blockId?
It's designed for data check, filter, memory data read, etc.

Why blockId is designed as above?
BlockId will be stored in Shuffle server, to reduce memory cost. Roaringbitmap is used to cache it.
According to implementation of Roaringbitmap, the design of BlockId is target to use BitmapContainer instead of ArrayContainer for memory saving.

What's the problem of blockId?
It can't support taskId which is greater than 2^20 - 1

Proposal
I think the first 19 bit is too much for atomic int, and we can leverage some of them for taskId.

[Bug] Blocks read inconsistent: expected xxx blocks, actual xxx blocks

  1. If we set spark.rss.data.replica.write=2 and spark.rss.data.replica=3,Data integrity cannot be guaranteed in any one shuffle server. right?
  2. But in method org.apache.uniffle.storage.handler.impl.LocalFileQuorumClientReadHandler#readShuffleData, it just read from one shuffle server

Flaky test HealthCheckCoordinatorGrpcTest on Apple Silicon

Run mvn clean install on Apple Silicon, HealthCheckCoordinatorGrpcTest failed as follows:

[INFO] Running org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 71.009 s <<< FAILURE! - in org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest
[ERROR] healthCheckTest  Time elapsed: 70.285 s  <<< FAILURE!
org.opentest4j.AssertionFailedError
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:35)
	at org.junit.jupiter.api.Assertions.fail(Assertions.java:115)
	at org.apache.uniffle.test.HealthCheckCoordinatorGrpcTest.healthCheckTest(HealthCheckCoordinatorGrpcTest.java:143)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

Environment:

os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6

[Performance Optimization] Improve the speed of writing index file in shuffle server

Motivation

When I test uniffle performance, i found a huge performance drop due to the low speed of writing index file. Flame graph attached:

reliao_img_1658917352873

Solution

Use the dataOutputStream = new DataOutputStream(new BufferedOutputStream(fileOutputStream)); in LocalFileWriter. Please refer to: https://stackoverflow.com/questions/38439410/java-dataoutputstream-writelong-byte-double-int-speed

Result

After apply this optimization:
reliao_img_1658917526994

[Feature Request] Deploy Uniffle on Kubernetes

It's good to have a Kubernetes operator to help us deploy Uniffle on Kubernetes. We can automate our system operations. This issue track the pull requests about Kubernetes operator.

[Improvement] Support Empty assignment to Shuffle Server

When I tested hudi, I got an error.
this is spark driver log, ERROR: Empty assignment to Shuffle Server

52278 [dag-scheduler-event-loop] INFO  org.apache.spark.shuffle.RssShuffleManager  - Generate application id used in rss: spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844
52281 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Empty assignment to Shuffle Server
52282 [dag-scheduler-event-loop] ERROR com.tencent.rss.client.impl.ShuffleWriteClientImpl  - Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
52283 [dag-scheduler-event-loop] WARN  org.apache.spark.scheduler.DAGScheduler  - Creating new stage failed due to exception - job: 5
com.tencent.rss.common.exception.RssException: Error happened when getShuffleAssignments with appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], numMaps[0], partitionNumPerRange[1] to coordinator
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.throwExceptionIfNecessary(ShuffleWriteClientImpl.java:440)
        at com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleAssignments(ShuffleWriteClientImpl.java:291)
        at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.java:247)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:97)
        at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
        at org.apache.spark.rdd.RDD.$anonfun$dependencies$2(RDD.scala:264)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.rdd.RDD.dependencies(RDD.scala:260)
        at org.apache.spark.scheduler.DAGScheduler.getShuffleDependenciesAndResourceProfiles(DAGScheduler.scala:634)
        at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:597)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:394)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:580)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
        at scala.collection.SetLike.map(SetLike.scala:104)
        at scala.collection.SetLike.map$(SetLike.scala:104)
        at scala.collection.mutable.AbstractSet.map(Set.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:579)
        at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:564)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1115)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2396)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2388)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2377)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
52287 [main] INFO  org.apache.spark.scheduler.DAGScheduler  - Job 5 failed: countByKey at BaseSparkCommitActionExecutor.java:191, took 0.076660 s

this is coordinator log , request partitionNum is 0

[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
full log
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:26,946 Grpc-267 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:26,946 Grpc-267 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[0] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:27,033 Grpc-270 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:27,033 Grpc-270 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:27,034 Grpc-270 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[1] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:37,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:29:43,047 Grpc-283 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:43,048 Grpc-283 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:43,048 Grpc-283 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[2] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,165 Grpc-293 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,166 Grpc-293 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,166 Grpc-293 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[3] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,247 Grpc-298 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,247 Grpc-298 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[4] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,267 Grpc-297 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5], partitionNum[200], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,267 Grpc-297 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:29:49,268 Grpc-297 CoordinatorGrpcService logAssignmentResult - Shuffle Servers of assignment for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[5] are [10.1.3.174-19990, 10.1.3.175-19990, 10.1.3.173-19990]
[INFO] 2022-08-11 11:29:49,335 Grpc-301 CoordinatorGrpcService getShuffleAssignments - Request of getShuffleAssignments for appId[spark-8304854d3e234816bba3c3a1e8bd0ade1660188566844], shuffleId[6], partitionNum[0], partitionNumPerRange[1], replica[1]
[WARN] 2022-08-11 11:29:49,335 Grpc-301 PartitionBalanceAssignmentStrategy assign - Can't get expected servers [13] and found only [3]
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Start to check status for 2 applications
[INFO] 2022-08-11 11:30:07,957 ApplicationManager-0 ApplicationManager statusCheck - Remove expired application:spark-d7f3e51ca713472e88568db90c91bdea1660187027133

Environment:

uniffle: firestorm 0.4.1
spark: 3.1.2
hudi: 0.10.0
k8s: v1.21.3

Improve our test code style

Now, there are many style errors in our test code. Because our checkstyle plugin don't check test code. To enable the plugin, we should fix the style errors of the code first. Tencent/Firestorm#155 pr tells us how enable plugin to check test code style. We should enable the plugin to check test code style in one branch, fix the style errors module by module and
cherry-pick the fix code in another branch, and raise a fix pr.

This issue will track the pull requests.
Related prs:
#99 fix common module
#122 fix coordinator module
#131 fix storage module

[Improvement] Disallow sendShuffleData if requireBufferId expired

We found shuffle server which under high load is easy encounter java.lang.OutOfMemoryError: Java heap space even we allocate more jvm heap memory and less rss.server.buffer.capacity

The steps for the exception above:

  1. When shuffle server under high load, requireBufferId is easy to expire, and Shuffle server release usedMemory
  2. Client sendShuffleData using a expired requireBufferId,
  3. Shuffle server recive shuffle data and store in rpc queue(this part of memory usage was not be added to usedMemory)
  4. Other clients requireBuffer success because usedMemory is enough

Flaky test RssShuffleUtilsTest

run mvn clean install

[INFO] Running org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] Tests run: 7, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.356 s <<< FAILURE! - in org.apache.uniffle.common.RssShuffleUtilsTest
[ERROR] testDestroyDirectByteBuffer  Time elapsed: 0.222 s  <<< FAILURE!
org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
	at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:179)
	at org.apache.uniffle.common.RssShuffleUtilsTest.testDestroyDirectByteBuffer(RssShuffleUtilsTest.java:72)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
	at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
	at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at java.util.ArrayList.forEach(ArrayList.java:1259)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:150)
	at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:124)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

Environment:

os: Big Sur 11.4
cpu: Apple M1
java: zulu 1.8.0_322
maven: 3.8.6

[Code Style] Indentation rules

There are 2 indentation styles in the current code base. Which are mostly the same except for line wrapping.
It would be better to have it unified early. How do you think?

4 spaces for line wrapping

public static void main(String[] args)
    throws Exception {
  for (int i = 0; i < args.length; i++) {
    System.out.println("Greeting the " + i +
        "-th argument");
    System.out.println("hello, " + args[i]);
  }
}

2 spaces for line wrapping

public static void main(String[] args)
  throws Exception {
  for (int i = 0; i < args.length; i++) {
    System.out.println("Greeting the " + i +
      "-th argument");
    System.out.println("hello, " + args[i]);
  }
}

[Improvement][AQE] Sort MapId before the data are flushed

When we use aqe, we need use mapId to filter the data which we don't need, If we sort MapId before the data are flushed. We split the data to segments, if a segment don't have the data which we want to read, we will drop the data. If data is sorted by mapId, we can filter more data and mprove our performance.

[Feature Request] Introduce the unique trace id to debug easily

Motivation

It's hard to analyze which process cost the most time of one remote request in current codebase, due to lacking corresponding trace id in client and server side.

Plan

Maybe we could introduce the unique trace id which is generated on time/client-machine-id in one remote request, and then record it in client's log. And when requesting remote server, we should populate this to server and make it recorded in server's log.

[Umbrella] Netty replace Grpc on data transfer

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the proposal

When we use Grpc, we find that our bottleneck is on Grpc, Grpc brings the cost of data copy and data serialization. We must encounter GC problems when we use Grpc. We should use Netty replace Grpc on data transfer and use off heap memory to reduce GC time.

Task list

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

[Feature] Support short-circuit read in local-file

Motivation

When uniffle shuffle servers are co-located with Yarn nodemanagers, we could use the short-circuit read to improve performance and reduce the overhead.

How to do

There are two options to solve this

  1. Directly read shuffle-data files by client side.
  2. Use the domain socket.

Option 1 - Directly read

This is the fastest way, but the local-files' read permission should be open for client side. This maybe have security problem.

Option2 - Domain socket

This way could be avoid security problem, but will be slower than above. And its implementation will be complex.

Conclusion

In my opinion, above two ways could be all supported in uniffle, which could be as different policies for users to choose.

Introduce the reservedData to support more custom pluggable accessCheckers

Background

In current implementation, DelegationRssShuffleManager could decide whether to use uniffle or sort shuffle service by calling remote coordinator. The pluggable accessCheckers can be extended in coordinator to support more custom requirements.

Motivation

When we hope the uniffle can be applied to specified spark jobs to control which shuffle service to use, the mechanism of pluggable accessCheckers is useful.

We can implement the CustomDelegationRssShuffleManager(like DelegationRssShuffleManager)to inject some custom accessInfo and remote call the coordinator which use the custom access policy to decide which spark job use the uniffle shuffle service. However the current codebase dont support inject more accessinfo in client, only have tags and accessid. So this proposal hopes to introduce the data structure to store more custom extensible requirement.

Goals

  1. Support injecting more custom data in AccessClusterRequest, which can be set in client

Tasks

Remove experimental feature with ShuffleUploader

ShuffleUploader is an experimental feature which is target to merge and upload data from local disk to remote storage if local disk hasn't enough space.
Currently, MEMORY_LOCAL_HDFS is introduced as the common solution for shuffle data storage.
ShuffleUploader is not needed any more, and we also can avoid maintenance this in the future.

[Performance Optimization] Multiple channels when getting shuffle data in client side

Motivation

Now the executor only will use the single TCP connection with the specified shuffle server, so when multiple tasks are running concurrently, it will share this channel. Maybe it will reduce the whole throughput.

Do we have any plan to introduce extra config to allow user to create more channels in client side?

Maybe we should do some performance test to prove this improvement effective. The update will be included in this ticket.

[Improvement][Aqe] Avoid calling `getShuffleResult` multiple times

When we use AQE, we may call shuffleWriteClient.getShuffleResult multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead.

[Improvement] ShuffleBlock should be release when finished reading

We found spark executor is easy be killed by yarn, and i found it is because executor use too mush offheap memory when read shuffle data.
I found most of offheap memory is used to store uncompressed shuffle Data, and this part of memory will be release only when GC is triggered

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.