Code Monkey home page Code Monkey logo

sparkrdma's Introduction

SparkRDMA ShuffleManager Plugin

SparkRDMA is a high performance ShuffleManager plugin for Apache Spark that uses RDMA (instead of TCP) when performing Shuffle data transfers in Spark jobs.

This open-source project is developed, maintained and supported by Mellanox Technologies.

Performance results

Terasort

TeraSort results

Running 320GB TeraSort workload with SparkRDMA is x2.63 faster than standard Spark (runtime in seconds)

Test environment:

7 Spark standalone workers on Azure "h16mr" VM instance, Intel Haswell E5-2667 V3,

224GB RAM, 2000GB SSD for temporary storage, Mellanox InfiniBand FDR (56Gb/s)

Also featured at the Spark+AI Summit 2018, please see more info on our session: https://databricks.com/session/accelerated-spark-on-azure-seamless-and-scalable-hardware-offloads-in-the-cloud

Pagerank

PageRank results

Running 19GB Pagerank with SparkRDMA is x2.01 faster than standard Spark (runtime in seconds)

Test environment:

5 Spark standalone workers, 2x Intel Xeon E5-2697 v3 @ 2.60GHz, 25 cores per Worker, 150GB RAM, non-flash storage (HDD)

Mellanox ConnectX-5 network adapter with 100GbE RoCE fabric, connected with a Mellanox Spectrum switch

Wiki pages

For more information on configuration, performance tuning and troubleshooting, please visit the SparkRDMA GitHub Wiki

Runtime requirements

  • Apache Spark 2.0.0/2.1.0/2.2.0/2.3.0/2.4.0
  • Java 8
  • An RDMA-supported network, e.g. RoCE or Infiniband

Installation

Obtain SparkRDMA and DiSNI binaries

Please use the "Releases" page to download pre-built binaries.
If you would like to build the project yourself, please refer to the "Build" section below.

The pre-built binaries are packed as an archive that contains the following files:

  • spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar
  • spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
  • spark-rdma-3.1-for-spark-2.2.0-jar-with-dependencies.jar
  • spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar
  • spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
  • libdisni.so

libdisni.so must be in java.library.path on every Spark Master and Worker (usually in /usr/lib)

Configuration

Provide Spark the location of the SparkRDMA plugin jars by using the extraClassPath option. For standalone mode this can be added to either spark-defaults.conf or any runtime configuration file. For client mode this must be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0, 2.3.0, 2.4.0 according to your Spark version):

spark.driver.extraClassPath   /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar
spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar

Running

To enable the SparkRDMA Shuffle Manager plugin, add the following line to either spark-defaults.conf or any runtime configuration file:

spark.shuffle.manager   org.apache.spark.shuffle.rdma.RdmaShuffleManager

Build

Building the SparkRDMA plugin requires Apache Maven and Java 8

  1. Obtain a clone of SparkRDMA

  2. Build the plugin for your Spark version (either 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0), e.g. for Spark 2.0.0:

mvn -DskipTests clean package -Pspark-2.0.0
  1. Obtain a clone of DiSNI for building libdisni:
git clone https://github.com/zrlio/disni.git
cd disni
git checkout tags/v1.7 -b v1.7
  1. Compile and install only libdisni (the jars are already included in the SparkRDMA plugin):
cd libdisni
autoprepare.sh
./configure --with-jdk=/path/to/java8/jdk
make
make install

Community discussions and support

For any questions, issues or suggestions, please use our Google group: https://groups.google.com/forum/#!forum/sparkrdma

Contributions

Any PR submissions are welcome

sparkrdma's People

Contributors

finlaym avatar petro-rudenko avatar yuvaldeg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparkrdma's Issues

Getting lower RDMA perf that TCP/IP perf

Hi all,

I am running Hibench suite terasort against RDMA Spark. My current results show that TCP is performaing better than RDMA. Any thoughts on configs to look into? Happy to post config settings currently in place as well to help diagnose.

Errors when using 2 or more nodes

Hello!
I found out that SparkRDMA works correctly on our cluster only when 2 worker nodes interacting between each other. The number of executors on each node doesn't matter (I tried 1, 2, 4 executors per node). When 3 or more nodes take part in task processing, some errors occur (logs below). Job ends successfully, but it takes much more time than without RDMA. How can i solve this problem? In case when only 2 nodes take part in task processing I can see performance of SparkRDMA.

Cluster

4 nodes (all nodes can be workers)

Component Description
Storage Samsung SSD 970 EVO Plus 250GB (NVMe)
OS Kubuntu 20.04 LTS
Network switch Huawei S5720 36-C with switching capacity of 598 Gbit/s
Network physical layer single mode optical fiber
Network adapter Mellanox ConnectX-4 Lx EN, 10 GbE single port SFP+
Memory 32 GB
CPU Intel Core i7-9700 CPU @ 3.00GHz, 8 cores with 1 thread per core

Yarn configurations (Hadoop 3.1.3)

Configuration Value
yarn.nodemanager.resource.memory-mb 27648
yarn.scheduler.maximum-allocation-mb 27648
yarn.scheduler.minimum-allocation-mb 13824

Spark configurations (Spark 2.4.0 )

SparkRDMA-3.1

DiSNI-1.7

Configuration Value Description
yarn.executor.num 3 For this test I use 3 nodes (1 executor per node)
yarn.executor.cores 5
spark.executor.memory 7g I use only ~50% of possible memory to prevent OOM errors (very often it can be)
spark.executor.memoryOverhead 8g as suggested here
spark.driver.memory 2g
spark.default.parallelism 45
spark.sql.shuffle.partitions 45
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false as suggested here
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.locality.wait 0

I run tests using HiBench. Dataset profile - huge (about 30 GB). Workload - TeraSort.

Some of Spark logs:

2020-11-24 12:00:02 INFO YarnClientSchedulerBackend:54 - Stopped
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
2020-11-24 12:00:02 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(2)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(1)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:345)
at org.apache.spark.shuffle.rdma.RdmaChannel.stop(RdmaChannel.java:894)
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:203)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RdmaNode connection listening thread" java.lang.RuntimeException: Exception in RdmaNode listening thread java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:210)
at java.lang.Thread.run(Thread.java:748)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO RdmaBufferManager:218 - Rdma buffers allocation statistics:
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 770 buffers of size 4 KB
2020-11-24 12:00:02 INFO disni:201 - deallocPd, pd 1
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO MemoryStore:54 - MemoryStore cleared
2020-11-24 12:00:02 INFO BlockManager:54 - BlockManager stopped
2020-11-24 12:00:02 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2020-11-24 12:00:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2020-11-24 12:00:02 INFO SparkContext:54 - Successfully stopped SparkContext
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Shutdown hook called
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-3a2b9379-56c3-4b40-ab40-e92f03a5c591
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-9e3d837b-5d83-467f-9a12-591ffd0503a8

Yarn logs from one worker

2020-11-24 11:58:51 INFO Executor:54 - Finished task 40.0 in stage 2.0 (TID 355). 1502 bytes result sent to driver
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver commanded a shutdown
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(1)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(2)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(3)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 140630589705088
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206395344
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver from hadoop-master:45735 disconnected during shutdown
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver from hadoop-master:45735 disconnected during shutdown
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(4)
2020-11-24 12:00:02 ERROR RdmaNode:384 - Failed to stop RdmaChannel during 50 ms
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 WARN RdmaChannel:897 - Failed to get RDMA_CM_EVENT_DISCONNECTED: getCmEvent() failed
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206905120
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206402048
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 WARN RdmaChannel:897 - Failed to get RDMA_CM_EVENT_DISCONNECTED: getCmEvent() failed
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 140630593575520
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO RdmaNode:213 - Exiting RdmaNode Listening Server
2020-11-24 12:00:02 INFO RdmaBufferManager:218 - Rdma buffers allocation statistics:
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 380 buffers of size 4 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 87 buffers of size 4096 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 24 buffers of size 1024 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 10 buffers of size 2048 KB
2020-11-24 12:00:02 INFO disni:201 - deallocPd, pd 1
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO MemoryStore:54 - MemoryStore cleared
2020-11-24 12:00:02 INFO BlockManager:54 - BlockManager stopped
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Shutdown hook called

Also I can see very big shuffle read time in Spark UI

image

SPARK RDMA , HIBENCH not able to run.

SPARK-2.2.0 Hadoop 2.7
HiBench face below issue,
But SPARK DiNSI RDMA basic benchmark able to run.

Please take a look, thanks very much.

Stack: [0x00007fe4b42e3000,0x00007fe4b43e4000], sp=0x00007fe4b43e20c8, free space=1020k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C [libc.so.6+0x147ce5] __memcpy_ssse3_back+0x45
C 0x00000000007e7ea8

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
j com.ibm.disni.rdma.verbs.impl.NativeDispatcher._connect(JJ)I+0
j com.ibm.disni.rdma.verbs.impl.RdmaCmNat.connect(Lcom/ibm/disni/rdma/verbs/RdmaCmId;Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+45
j com.ibm.disni.rdma.verbs.RdmaCmId.connect(Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+6
j org.apache.spark.shuffle.rdma.RdmaChannel.connect(Ljava/net/InetSocketAddress;)V+207
j org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(Ljava/net/InetSocketAddress;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+148
j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Ljava/lang/String;IZ)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+20
j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Lorg/apache/spark/shuffle/rdma/RdmaShuffleManagerId;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+10
j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Lorg/apache/spark/shuffle/rdma/RdmaChannel;+15
j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Ljava/lang/Object;+1
j scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1()Lscala/util/Try;+8
j scala.concurrent.impl.Future$PromiseCompletingRunnable.run()V+5
j scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec()Z+4
j scala.concurrent.forkjoin.ForkJoinTask.doExec()I+10
j scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V+10
j scala.concurrent.forkjoin.ForkJoinPool.runWorker(Lscala/concurrent/forkjoin/ForkJoinPool$WorkQueue;)V+11
j scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V+14
v ~StubRoutines::call_stub

Seeing ERROR RdmaNode: Failed in RdmaNode constructor in Standalone cluster mode

Hi,

I'm trying to submit a SparkPi example using SparkRDMA. I have the following in my spark-default.conf

spark.driver.extraClassPath   /path/to/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.executor.extraClassPath /path/to/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.shuffle.manager   org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.rdma.driverPort 3037
spark.shuffle.rdma.executorPort 4037
spark.shuffle.rdma.portMaxRetries 100

but I see the following errors:

./bin/spark-submit --deploy-mode client --master spark://dgx03:7077 --class org.apache.spark.examples.SparkPi ./examples/target/scala-2.11/jars/spark-examples_2.11-2.4.0.jar
...
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/02/15 12:12:35 INFO SparkContext: Running Spark version 2.4.0
19/02/15 12:12:35 INFO SparkContext: Submitted application: Spark Pi
19/02/15 12:12:35 INFO SecurityManager: Changing view acls to: akven
19/02/15 12:12:35 INFO SecurityManager: Changing modify acls to: akven
19/02/15 12:12:35 INFO SecurityManager: Changing view acls groups to:
19/02/15 12:12:35 INFO SecurityManager: Changing modify acls groups to:
19/02/15 12:12:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with vie
w permissions: Set(akven); groups with view permissions: Set(); users  with modify permissions: Set(akven); groups with modify permissions: Set()
19/02/15 12:12:36 INFO Utils: Successfully started service 'sparkDriver' on port 36411.
19/02/15 12:12:36 INFO SparkEnv: Registering MapOutputTracker
19/02/15 12:12:36 INFO disni: creating  RdmaProvider of type 'nat'
19/02/15 12:12:36 INFO disni: jverbs jni version 32
19/02/15 12:12:36 INFO disni: sock_addr_in size mismatch, jverbs size 28, native size 16
19/02/15 12:12:36 INFO disni: IbvRecvWR size match, jverbs size 32, native size 32
19/02/15 12:12:36 INFO disni: IbvSendWR size mismatch, jverbs size 72, native size 128
19/02/15 12:12:36 INFO disni: IbvWC size match, jverbs size 48, native size 48
19/02/15 12:12:36 INFO disni: IbvSge size match, jverbs size 16, native size 16
19/02/15 12:12:36 INFO disni: Remote addr offset match, jverbs size 40, native size 40
19/02/15 12:12:36 INFO disni: Rkey offset match, jverbs size 48, native size 48
19/02/15 12:12:36 INFO disni: createEventChannel, objId 47303849051008
19/02/15 12:12:36 INFO disni: createId, id 47303849066960
19/02/15 12:12:36 INFO disni: bindAddr, address dgx03/A.B.C.D:3037
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3037 on iteration 0
19/02/15 12:12:36 INFO disni: bindAddr, address dgx03/A.B.C.D:3038
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3038 on iteration 1
...
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3052 on iteration 15
19/02/15 12:12:36 ERROR RdmaNode: Failed in RdmaNode constructor

^ This seems to be the main error

19/02/15 12:12:36 INFO disni: destroyCmId, id 0
19/02/15 12:12:36 INFO disni: destroyEventChannel, channel 0
19/02/15 12:12:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:264)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
        at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
        at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to bind. Make sure your NIC supports RDMA
        at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:87)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.<init>(RdmaShuffleManager.scala:137)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:264)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
        at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
        at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Any thoughts on what I need to change to fix this? Thanks in advance.

Fail to re-produce the speed-up of TeraSort with SparkRDMA

We have compare the performance of using Spark shuffle manager and pure TCP. The speed-up is 30% at most but not 2.63 times from the README.md.

Can the tester of this project provide more specified parameters to re-produce this performance? For example, how many partition should we use and should we restrict the executor memory or executor number?

libdisni resolve hostname with another IP instead of the IP from RdmaNode

We have try to run SparkRDMA in yarn cluster. The job has been submitted successfully and initialized the RDMA network. Here is the log.

019-02-27 17:01:37 INFO  disni:42 - creating  RdmaProvider of type 'nat'
2019-02-27 17:01:37 INFO  disni:40 - jverbs jni version 32
2019-02-27 17:01:37 INFO  disni:46 - sock_addr_in size mismatch, jverbs size 28, native size 16
2019-02-27 17:01:37 INFO  disni:55 - IbvRecvWR size match, jverbs size 32, native size 32
2019-02-27 17:01:37 INFO  disni:58 - IbvSendWR size mismatch, jverbs size 72, native size 128
2019-02-27 17:01:37 INFO  disni:67 - IbvWC size match, jverbs size 48, native size 48
2019-02-27 17:01:37 INFO  disni:73 - IbvSge size match, jverbs size 16, native size 16
2019-02-27 17:01:37 INFO  disni:80 - Remote addr offset match, jverbs size 40, native size 40
2019-02-27 17:01:37 INFO  disni:86 - Rkey offset match, jverbs size 48, native size 48
2019-02-27 17:01:37 INFO  disni:61 - createEventChannel, objId 140472690913856
2019-02-27 17:01:37 INFO  disni:79 - createId, id 140472690927680
2019-02-27 17:01:37 INFO  disni:138 - bindAddr, address /192.168.1.4:0
2019-02-27 17:01:37 INFO  RdmaNode:223 - cpuList from configuration file: 
2019-02-27 17:01:37 INFO  RdmaNode:258 - Empty or failure parsing the cpuList. Defaulting to all available CPUs
2019-02-27 17:01:37 INFO  RdmaNode:274 - Using cpuList: [5, 15, 10, 22, 6, 20, 37, 7, 30, 36, 35, 2, 14, 32, 29, 25, 18, 21, 11, 33, 16, 31, 24, 38, 39, 19, 27, 34, 9, 13, 3, 17, 4, 12, 26, 28, 23, 8, 1, 0]
2019-02-27 17:01:37 INFO  disni:150 - listen, id 0
2019-02-27 17:01:37 INFO  disni:69 - allocPd, objId 140472691160992
2019-02-27 17:01:37 INFO  RdmaNode:116 - Starting RdmaNode Listening Server, listening on: /192.168.1.4:57810

If we run the TeraSort which uses shuffle with rdma shuffle manager, the executor will exit unexpectedly. Here is the error log.

2019-02-27 17:01:43 INFO  DAGScheduler:54 - Shuffle files lost for executor: 1 (epoch 1)
2019-02-27 17:01:43 INFO  YarnAllocator:54 - Completed container container_1547200321055_0731_01_000002 on host: m7-model-inf07 (state: COMPLETE, exit status: 134)
2019-02-27 17:01:43 WARN  YarnAllocator:66 - Container marked as failed: container_1547200321055_0731_01_000002 on host: m7-model-inf07. Exit status: 134. Diagnostics: Exception from container-launch.
Container id: container_1547200321055_0731_01_000002
Exit code: 134
Exception message: /bin/bash: line 1: 17944 Aborted                 LD_LIBRARY_PATH=./libdisni.so::/mnt/disk0/home/work/hadoop/lib/native:/mnt/disk0/home/work/hadoop/lib/native /home/work/jdk/bin/java -server -Xmx8192m -Djava.io.tmpdir=/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=38467' -Dspark.yarn.app.container.log.dir=/mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@m7-model-inf03:38467 --executor-id 1 --hostname m7-model-inf07 --cores 4 --app-id application_1547200321055_0731 --user-class-path file:/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/__app__.jar > /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stdout 2> /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stderr

Stack trace: ExitCodeException exitCode=134: /bin/bash: line 1: 17944 Aborted                 LD_LIBRARY_PATH=./libdisni.so::/mnt/disk0/home/work/hadoop/lib/native:/mnt/disk0/home/work/hadoop/lib/native /home/work/jdk/bin/java -server -Xmx8192m -Djava.io.tmpdir=/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=38467' -Dspark.yarn.app.container.log.dir=/mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@m7-model-inf03:38467 --executor-id 1 --hostname m7-model-inf07 --cores 4 --app-id application_1547200321055_0731 --user-class-path file:/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/__app__.jar > /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stdout 2> /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stderr

	at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
	at org.apache.hadoop.util.Shell.run(Shell.java:479)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
	at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
	at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 134

If we try to read the log of Yarn container, it shows that disni call bindAddr with the actual RDMA IP(192.168.1.9) but call resolveAddr with the wrong non-RoCE IP(172.27.128.154).

2019-02-27 15:47:37 INFO  disni:80 - Remote addr offset match, jverbs size 40, native size 40                                                                                                               2019-02-27 15:47:37 INFO  disni:86 - Rkey offset match, jverbs size 48, native size 48
2019-02-27 15:47:37 INFO  disni:61 - createEventChannel, objId 140534197564176                                                                                                                              2019-02-27 15:47:37 INFO  disni:79 - createId, id 140534197563312
2019-02-27 15:47:37 INFO  disni:138 - bindAddr, address /192.168.1.9:0
2019-02-27 15:47:37 INFO  RdmaNode:223 - cpuList from configuration file:
2019-02-27 15:47:37 INFO  RdmaNode:258 - Empty or failure parsing the cpuList. Defaulting to all available CPUs
2019-02-27 15:47:37 INFO  RdmaNode:274 - Using cpuList: [18, 8, 22, 17, 31, 3, 5, 34, 24, 37, 20, 25, 13, 16, 28, 21, 36, 26, 9, 33, 10, 6, 14, 1, 27, 39, 4, 38, 15, 19, 7, 35, 32, 12, 30, 29, 2, 23, 11,
0]                                                                                                                                                                                                          2019-02-27 15:47:37 INFO  disni:150 - listen, id 0
2019-02-27 15:47:37 INFO  disni:69 - allocPd, objId 140534189036912
2019-02-27 15:47:37 INFO  RdmaNode:116 - Starting RdmaNode Listening Server, listening on: /192.168.1.9:38440
2019-02-27 15:47:37 INFO  disni:61 - createEventChannel, objId 19752576
2019-02-27 15:47:37 INFO  disni:79 - createId, id 19753632
2019-02-27 15:47:37 INFO  disni:167 - resolveAddr, addres m7-model-inf08/172.27.128.154:37652
2019-02-27 15:47:37 INFO  RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007fd0649e197b, pid=33656, tid=0x00007fd06a9cf700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C  [librdmacm.so.1+0x597b]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /mnt/disk3/nm-local/usercache/work/appcache/application_1547200321055_0724/container_1547200321055_0724_02_000006/hs_err_pid33656.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
End of LogType:stdout

SparkRDMA issue:ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job

Hi
I attached the log for your reference. Please help me to check it, thansks!
Because my servers are ARMv8 64bit server, so I have downloaded the MLNX_OFED_LINUX-4.4-2.0.7.0-rhel7.5alternate-aarch64.tgz and have compiled it and installed it, and used perftest to test it
and make sure the physical RDMA is ok.
I have setup two nodes for hadoop(hadoop-2.7.1) and spark(spark-2.2.0-bin-hadoop2.7 with standalone) on ARMv8 64bit server(Qualcomm ARM server)๏ผŒ and used HiBench-7.0 terasort case to validate the SparkRDMA function. Firstly I run it with Spark on yarn mode(Dynamic Resource Allocation), but it failed. And I find you can run it successfully on spark with Standalone mode. So I change to spark Standalone mode.
I building libdisni version 1.7.(git checkout tags/v1.7 -b v1.7), and configure SparkRDMA as below:
spark.driver.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.driver.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.executor.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
The issue log is:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 45.3 in stage 1.0 (TID 241) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 175]
18/11/30 15:03:26 ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled
18/11/30 15:03:26 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 5.189 s due to Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more

Driver stacktrace:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 1.0 (TID 242) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 176]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 17.0 in stage 1.0 (TID 63) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 177]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 1.0 (TID 236) on 192.168.5.136, executor 4: java.lang.reflect.InvocationTargetException (null) [duplicate 178]
18/11/30 15:03:26 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 5.472082 s
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 89.0 in stage 1.0 (TID 135) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 179]
18/11/30 15:03:26 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181130150320_0006.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)

spark rdma error

Hi,
I was trying to run SparkRDMA Terasort code. The common Spark Terasort can finish successfully, however, there exist errors for Spark RDMA Terasort code. Here is the errors as below:
error9

I used Spark 2.1.0

SparkRDMA issue:ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job

bench.log

Hi
I attached the log for your reference. Please help me to check it, thansks!
Because my servers are ARMv8 64bit server, so I have downloaded the MLNX_OFED_LINUX-4.4-2.0.7.0-rhel7.5alternate-aarch64.tgz and have compiled it and installed it, and used perftest to test it
and make sure the physical RDMA is ok.
I have setup two nodes for hadoop(hadoop-2.7.1) and spark(spark-2.2.0-bin-hadoop2.7 with standalone) on ARMv8 64bit server(Qualcomm ARM server)๏ผŒ and used HiBench-7.0 terasort case to validate the SparkRDMA function. Firstly I run it with Spark on yarn mode(Dynamic Resource Allocation), but it failed. And I find you can run it successfully on spark with Standalone mode. So I change to spark Standalone mode.
I building libdisni version 1.7.(git checkout tags/v1.7 -b v1.7).
And configure SparkRDMA as below:
spark.driver.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.driver.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.executor.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
The issue is:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 45.3 in stage 1.0 (TID 241) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 175]
18/11/30 15:03:26 ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled
18/11/30 15:03:26 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 5.189 s due to Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more

Driver stacktrace:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 1.0 (TID 242) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 176]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 17.0 in stage 1.0 (TID 63) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 177]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 1.0 (TID 236) on 192.168.5.136, executor 4: java.lang.reflect.InvocationTargetException (null) [duplicate 178]
18/11/30 15:03:26 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 5.472082 s
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 89.0 in stage 1.0 (TID 135) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 179]
18/11/30 15:03:26 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181130150320_0006.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)

Need memory overhead to run Spark RDMA shuffler

We use SparkRDMA by adding libdisni.so and spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar in command of spark-submit. But some tasks always fail because of the issue of java.io.IOException: getCmEvent() failed. Here is the complete log.

java.io.IOException: getCmEvent() failed
	at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:348)
	at org.apache.spark.shuffle.rdma.RdmaChannel.connect(RdmaChannel.java:309)
	at org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(RdmaNode.java:308)
	at org.apache.spark.shuffle.rdma.RdmaShuffleManager.org$apache$spark$shuffle$rdma$RdmaShuffleManager$$getRdmaChannel(RdmaShuffleManager.scala:314)
	at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannelToDriver(RdmaShuffleManager.scala:322)
	at org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$7.apply(RdmaShuffleManager.scala:349)
	at org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$7.apply(RdmaShuffleManager.scala:343)
	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
	at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getMapTaskOutputTable(RdmaShuffleManager.scala:342)
	at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.startAsyncRemoteFetches(RdmaShuffleFetcherIterator.scala:185)
	at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.initialize(RdmaShuffleFetcherIterator.scala:325)
	at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.<init>(RdmaShuffleFetcherIterator.scala:85)
	at org.apache.spark.shuffle.rdma.RdmaShuffleReader.read(RdmaShuffleReader.scala:44)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:98)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

It seems to require more memory to run SparkRDMA and it may work by adding --conf spark.executor.memoryOverhead=10g when using this. We are confused about this and how much memory should we add for this task?

Fail to write HDFS with custom codec when using SparkRDMA

We have used Spark RDMA shuffle manager to replace the default shuffle manager. But it throw MetadataFetchFailedException. Here is the error log.

19/05/22 10:23:31 INFO disni: createCompChannel, context 139795900960400
19/05/22 10:23:31 INFO disni: createCQ, objId 139794752991072, ncqe 1
19/05/22 10:23:31 INFO disni: createQP, objId 139796347838856, send_wr size 0, recv_wr_size 0
19/05/22 10:23:31 INFO disni: accept, id 0
19/05/22 10:23:31 INFO ProphetInstanceStatistics: create OutputStream for part 198, output file is :hdfs:///prophet/autoui/360leap-ptest/workspace/test/pws/1/FeatureExtract/455a1e51160e4798bf13b18f50ad7d09/36/c5bbce26-01b2-4731-af13-871bb635c751/0/1558491641346/signValueMap/part-198-_w0
19/05/22 10:23:31 ERROR FeSpark$$anon$1: Failed to process 198
19/05/22 10:23:31 ERROR FeSpark$$anon$1: processor failed
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(138, node3.leap.com, 36778, None) has no RDMA connection to BlockManagerId(36, node5.leap.com, 41465, None)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
        at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19/05/22 10:23:31 INFO RdmaNode: Established connection to /172.27.193.1:60672 in 983 ms
19/05/22 10:23:31 ERROR Executor: Exception in task 198.0 in stage 4.0 (TID 774)
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
        at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
19/05/22 10:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 897
19/05/22 10:23:31 INFO Executor: Running task 321.0 in stage 4.0 (TID 897)
19/05/22 10:23:31 INFO Executor: Executor is trying to kill task 321.0 in stage 4.0 (TID 897), reason: Stage cancelled

It seems that the RDMA connection is not established but it will work with other SparkSQL jobs. We are not sure if it may be the bug of SparkRDMA.

java.lang.NoClassDefFoundError: Could not initialize class com.ibm.disni .rdma.verbs.impl.NativeDispatcher

I'm seeing the error below when running a spark on 2-nodes (1 master and 2 workers). I'm not a frequent user of Java but any thoughts on why I'd be seeing an initialization error here?

2019-02-27 22:37:36 WARN  SparkContext:66 - Using an existing SparkContext; some configuration may not take effect.
2019-02-27 22:38:10 WARN  TaskSetManager:66 - Lost task 11.0 in stage 0.0 (TID 11, 10.31.229.69, executor 0): java.lang.NoClassDefFoundError: Co
uld not initialize class com.ibm.disni.rdma.verbs.impl.NativeDispatcher
        at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
        at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
        at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
        at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
        at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2019-02-27 22:38:10 WARN  TaskSetManager:66 - Lost task 5.0 in stage 0.0 (TID 5, 10.31.229.69, executor 0): java.lang.UnsatisfiedLinkError: no d
isni in java.library.path   
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
        at java.lang.Runtime.loadLibrary0(Runtime.java:870)
        at java.lang.System.loadLibrary(System.java:1122)
        at com.ibm.disni.rdma.verbs.impl.NativeDispatcher.<clinit>(NativeDispatcher.java:36)
        at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
        at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
        at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
        at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
        at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        ...
        
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.ibm.disni.rdma.verbs.impl.NativeDispatcher
        at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
        at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
        at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
        at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
        at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
        at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Error in accept call on a passive RdmaChannel

Hi,

I am currently evaluating this library and not having done any specific configuration with respect to the infiniband network the spark nodes interconnect on. Can you point me in right direction on what might be cause of issue. See config and stacktrace below.

spark2-submit -v --num-executors 10 --executor-cores 5 --executor-memory 4G --conf spark.driver.extraClassPath=/opt/mellanox/spark-rdma-3.1.jar --conf spark.executor.extraClassPath=/opt/mellanox/spark-rdma-3.1.jar --conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager --class com.github.ehiggs.spark.terasort.TeraSort /tmp/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar /tmp/data/terasort_in /tmp/data/terasort_out

Parsed arguments:
master yarn
deployMode client
executorMemory 4G
executorCores 5
totalExecutorCores null
propertiesFile /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/conf/spark-defaults.conf
driverMemory null
driverCores null
driverExtraClassPath /opt/mellanox/spark-rdma-3.1.jar
driverExtraLibraryPath /opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native
driverExtraJavaOptions null
supervise false
queue null
numExecutors 10
files null
pyFiles null
archives null
mainClass com.github.ehiggs.spark.terasort.TeraSort
primaryResource file:/tmp/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar
name com.github.ehiggs.spark.terasort.TeraSort
childArgs [/tmp/data/terasort_in /tmp/data/terasort_out]
jars null
packages null
packagesExclusions null
repositories null

(spark.shuffle.manager,org.apache.spark.shuffle.rdma.RdmaShuffleManager)
(spark.executor.extraLibraryPath,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native)
(spark.authenticate,false)
(spark.yarn.jars,local:/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/jars/)
(spark.driver.extraLibraryPath,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native)
(spark.yarn.historyServer.address,


(spark.yarn.am.extraLibraryPath,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native)
(spark.eventLog.enabled,true)
(spark.dynamicAllocation.schedulerBacklogTimeout,1)
(spark.yarn.config.gatewayPath,/opt/cloudera/parcels)
(spark.ui.killEnabled,true)
(spark.dynamicAllocation.maxExecutors,148)
(spark.serializer,org.apache.spark.serializer.KryoSerializer)
(spark.shuffle.service.enabled,true)
(spark.hadoop.yarn.application.classpath,)
(spark.dynamicAllocation.minExecutors,0)
(spark.dynamicAllocation.executorIdleTimeout,60)
(spark.yarn.config.replacementPath,{{HADOOP_COMMON_HOME}}/../../..)
(spark.sql.hive.metastore.version,1.1.0)
(spark.submit.deployMode,client)
(spark.shuffle.service.port,7337)
(spark.executor.extraClassPath,/opt/mellanox/spark-rdma-3.1.jar)
(spark.hadoop.mapreduce.application.classpath,)
(spark.eventLog.dir,
(spark.master,yarn)
(spark.dynamicAllocation.enabled,true)
(spark.sql.catalogImplementation,hive)
(spark.sql.hive.metastore.jars,${env:HADOOP_COMMON_HOME}/../hive/lib/
:${env:HADOOP_COMMON_HOME}/client/*)
(spark.driver.extraClassPath,/opt/mellanox/spark-rdma-3.1.jar)

19/05/02 13:54:17 WARN spark.SparkContext: Using an existing SparkContext; some configuration may not take effect.
[Stage 0:> (0 + 17) / 45]19/05/02 13:54:18 ERROR rdma.RdmaNode: Error in accept call on a passive RdmaChannel: java.io.IOException: createCQ() failed
java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:345)
at org.apache.spark.shuffle.rdma.RdmaChannel.stop(RdmaChannel.java:894)
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:176)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RdmaNode connection listening thread" java.lang.RuntimeException: Exception in RdmaNode listening thread java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:210)
at java.lang.Thread.run(Thread.java:748)
19/05/02 13:54:20 WARN scheduler.TaskSetManager: Lost task 9.0 in stage 0.0 (TID 3, , executor 3): java.lang.ArithmeticException: / by zero
at org.apache.spark.shuffle.rdma.RdmaNode.getNextCpuVector(RdmaNode.java:278)
at org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(RdmaNode.java:301)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.org$apache$spark$shuffle$rdma$RdmaShuffleManager$$getRdmaChannel(RdmaShuffleManager.scala:314)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannelToDriver(RdmaShuffleManager.scala:322)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.publishMapTaskOutput(RdmaShuffleManager.scala:410)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.stop(RdmaWrapperShuffleWriter.scala:118)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:97)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

[SparkRDMA] set up SPARK_LOCAL_IP for RoCE network

Hello,

I am testing the SparkRDMA with Mellanox ConnectX-4Lx card. I installed the Spark-2.2.0 and download SparkTeraSort sample code. The sparkterasort sample code can ran successfully with spark-2.2.0, however, when run the terasort code with the SparkRDMA plugin, it throws out error which is show as following picture.
errors1
Do I need upgrade libibverb.so or do I need configure the RDMA network for Spark? Please help.

Add libdisni.so in wiki to test performance of SparkRDMA

We have found some issue about the wiki page in https://github.com/Mellanox/SparkRDMA/wiki/Running-HiBench-with-SparkRDMA which may be fixed easily.

In section "Experiment #1: TeraSort", we need to add not only spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar but also libdisni.so.

Add to HiBench/conf/spark.conf:
spark.driver.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.executor.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0

It should be changed to this.

Add to HiBench/conf/spark.conf:
spark.driver.extraLibraryPath /PATH/TO/libdisni.so
spark.executor.extraLibraryPath /PATH/TO/libdisni.so
spark.driver.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.executor.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0

spark rdma IBV_WC_WR_FLUSH_ERR

When I run Spark on yarn
Spark2.1.0
Hadoop2.7.3

My spark task is correct
But when my data is big,spark RdmaShuffleManager got error ,please check attch!

Spark rdma conf:
spark.driver.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
spark.executor.extraLibraryPath /home/bigdata/local/rdma
spark.driver.extraLibraryPath /home/bigdata/local/rdma

logs:

INFO DAGScheduler: Executor lost: 2 (epoch 29)
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, jx-bd-hadoop523, 35995, None)
19/02/18 10:18:10 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
19/02/18 10:18:10 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 29)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 5 is now unavailable on executor 2 (0/1, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 9 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 2 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 4 is now unavailable on executor 2 (0/2, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 1 is now unavailable on executor 2 (0/500, false)
19/02/18 10:18:12 WARN TransportChannelHandler: Exception in connection from /10.200.20.213:45426
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2.
19/02/18 10:18:12 ERROR YarnScheduler: Lost an executor 2 (already removed): Pending loss reason.
19/02/18 10:18:12 INFO BlockManagerMaster: Removal of executor 2 requested
19/02/18 10:18:12 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 2

19/02/18 10:17:42 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 39.9 KB, free 7.0 GB)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:41323)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 0 of size 8192 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 0 took 4 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 19.000389 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 13.038783 ms
19/02/18 10:17:42 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 10.63309 ms
19/02/18 10:17:42 INFO DiskBlockManager: Shutdown hook called
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 12, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:41323)
19/02/18 10:17:42 INFO ShutdownHookManager: Shutdown hook called
19/02/18 10:17:42 ERROR Executor: Exception in task 14.0 in stage 9.0 (TID 1518)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 12.0 in stage 9.0 (TID 1517)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 8.0 in stage 9.0 (TID 1513)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 12 of size 16384 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 12 took 0 ms
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1520
19/02/18 10:17:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 1520)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1521
19/02/18 10:17:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 1521)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1522
19/02/18 10:17:42 INFO Executor: Running task 3.0 in stage 3.0 (TID 1522)
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 23.898935 ms

19/02/18 10:44:51 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on jx-bd-hadoop528:41859 (size: 3.8 KB, free: 7.3 GB)
19/02/18 10:44:51 INFO disni: createCompChannel, context 140379227583536
19/02/18 10:44:51 INFO disni: createCQ, objId 140369536012256, ncqe 4352
19/02/18 10:44:51 INFO disni: createQP, objId 140369536014024, send_wr size 4096, recv_wr_size 256
19/02/18 10:44:51 INFO disni: accept, id 0
19/02/18 10:44:51 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.200.20.218:45240
19/02/18 10:44:52 WARN TaskSetManager: Lost task 0.0 in stage 5.2 (TID 8, jx-bd-hadoop528 executor 37): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

)
19/02/18 10:44:52 INFO TaskSetManager: Task 0.0 in stage 5.2 (TID 8) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
19/02/18 10:44:52 INFO DAGScheduler: Marking ResultStage 5 (processCmd at CliDriver.java:376) as failed due to a fetch failure from ShuffleMapStage 4 (processCmd at CliDriver.java:376)
19/02/18 10:44:52 INFO YarnScheduler: Removed TaskSet 5.2, whose tasks have all completed, from pool
19/02/18 10:44:52 INFO DAGScheduler: ResultStage 5 (processCmd at CliDriver.java:376) failed in 1.026 s due to org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528.zeus, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

INFO TaskSetManager: Task 134.0 in stage 13.0 (TID 2417) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
WARN TaskSetManager: Lost task 33.0 in stage 13.0 (TID 2333, jx-bd-hadoop526, executor 26): FetchFailed(BlockManagerId(48, jx-bd-hadoop530, 42158, None),
shuffleId=14, mapId=0, reduceId=33, message=
org.apache.spark.shuffle.FetchFailedException: RDMA Send/Write/Read WR completed with error: IBV_WC_WR_FLUSH_ERR

ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaShuffleManager

the spark-defaults.conf of both workers and master are

spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.driver.extraClassPath /home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar

and I have placed libdisni.so in /usr/lib.
When I run TeraGen build from spark-terasort,I can run it with --master spark://master:7077 --deploy-mode client,which whole command is

spark-submit  --master spark://master:7077 --deploy-mode client --class com.github.ehiggs.spark.terasort.TeraGen  /home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar  1g hdfs://master:9000/data/terasort_in1g

but failed with ClassNotFoundException while using --master spark://master:7077 --deploy-mode cluster,which whole command is

spark-submit  --master spark://master:7077 --deploy-mode cluster --class com.github.ehiggs.spark.terasort.TeraGen  /home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar  1g hdfs://master:9000/data/terasort_in1g

the error info is

Launch Command: "/home/rui/software/jdk1.8.0_212/bin/java" "-cp" "/home/rui/software/spark-2.4.0-bin-hadoop2.7/conf/:/home/rui/software/spark-2.4.0-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.executor.extraClassPath=/home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar" "-Dspark.driver.supervise=false" "-Dspark.submit.deployMode=cluster" "-Dspark.master=spark://master:7077" "-Dspark.driver.extraClassPath=/home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar" "-Dspark.jars=file:/home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar" "-Dspark.rpc.askTimeout=10s" "-Dspark.app.name=com.github.ehiggs.spark.terasort.TeraGen" "-Dspark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager" "org.apache.spark.deploy.worker.DriverWrapper" "spark://[email protected]:43489" "/home/rui/software/spark-2.4.0-bin-hadoop2.7/work/driver-20190930101138-0006/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar" "com.github.ehiggs.spark.terasort.TeraGen" "5g" "hdfs://master:9000/data/terasort_in5g2"
========================================

Exception in thread "main" java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
	at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaShuffleManager
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
	at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:259)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
	at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
	at com.github.ehiggs.spark.terasort.TeraGen$.main(TeraGen.scala:48)
	at com.github.ehiggs.spark.terasort.TeraGen.main(TeraGen.scala)
	... 6 more

How should I fix it?
What's more, in client deployment , when I generate 50GB data using teragen and use raw spark with no spark-defaults.conf, the transfer speed between master and slave is about 270MB/s. However,when I change my spark-defaults.conf and replace spark.shuffle.manager to org.apache.spark.shuffle.rdma.RdmaShuffleManager, the speed is also 270MB/s.
Is this because I used hdfs storage but it has nothing to do with spark shuffle?
Can you recommed a workload for me to significantly improve completion time when using spark rdma?
Thanks a lot !

Fail to setup RoCE IP for Spark in Yarn-cluster mode

We have add rdma shuffle manager from SparkRDMA for Spark applications and submit the jobs in yarn-cluster mode.

Just like #5 , we got the error of "Fail to bind the port". We have check the used IP which is not the excepted one of RoCE NIC. But thee is no way to setup this IP in Spark workers.

We have try adding LOCAL_SPARK_IP in spark-env.sh but it only works for client mode while we are using yarn-cluster mode. I'm not sure if it works by setting the server's hostname or Yarn nodemanger address with this RoCE NIC IP.

Steam is corrupted when shuffle read with RDMA Shuffle Manager

We have setup RDMA environment and run the Spark jobs with RDMA Shuffle Manager. Here is the Spark command to submit the job.

${SPARK_HOME}/bin/spark-submit \
        --executor-memory 7200m \
        --master yarn \
        --num-executors 40 \
        --files /root/spark-benchmark/tools/SparkRDMA/libdisni.so,/root/spark-benchmark/tools/SparkRDMA/spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --executor-cores 1 \
        --deploy-mode cluster \
        --driver-memory 4g \
        --conf spark.eventLog.dir=hdfs:///spark_benchmark/event_log/2.3.0 \
        --conf spark.yarn.jars=hdfs:///spark_benchmark/dependency/spark230_jars/*.jar \
        --conf spark.shuffle.spill.compress=false \
        --conf spark.executor.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager \
        --conf spark.driver.extraLibraryPath=./ \
        --conf spark.hadoop.yarn.timeline-service.enabled=false \
        --conf spark.eventLog.enabled=true \
        --conf spark.driver.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
        --conf spark.executor.extraLibraryPath=./ \
        --conf spark.executor.memoryOverhead=128g \
        --conf spark.shuffle.compress=true \
        --conf spark.sql.shuffle.partitions=340 \
        --conf spark.task.maxFailures=1 \
        --conf spark.yarn.maxAppAttempts=1 \
        --class com.example.SparkApp

This script may work at times. But sometimes it may cause the fail task although the task may re-schedule and complete finally. Here is the error log of the fail task. Since we have set spark.shuffle.compress=true and the default compressor Lz4Codec will throw exception of Stream is corrupted.

org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
	at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:68)
	at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:64)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
	... 8 more

If we set spark.shuffle.compress=false, the error will be throw by readFully when try to unserialized the steam to row object.

Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 223, lnode6.leap.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 723048 bytes; 825504310 bytes expected
	at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
	... 8 more

The SparkApp is simple. We read the data from HDFS and do some transformation before calling saveAsTextFile to save the data in HDFS. The fail task always happen in the final stage.

By the way, this issue may be not easy to reproduce. But in our environment, we have use the custom codec to save the RDD which may raise the probability about this issue. The codec can not get the stream or break the stream before RDMA Shuffle Manager doing the shuffle read, so we still have no clue about the root cause of this issue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.