mellanox / sparkrdma Goto Github PK
View Code? Open in Web Editor NEWThis is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
License: Apache License 2.0
This is archive of SparkRDMA project. The new repository with RDMA shuffle acceleration for Apache Spark is here: https://github.com/Nvidia/sparkucx
License: Apache License 2.0
We have try to run SparkRDMA in yarn cluster. The job has been submitted successfully and initialized the RDMA network. Here is the log.
019-02-27 17:01:37 INFO disni:42 - creating RdmaProvider of type 'nat'
2019-02-27 17:01:37 INFO disni:40 - jverbs jni version 32
2019-02-27 17:01:37 INFO disni:46 - sock_addr_in size mismatch, jverbs size 28, native size 16
2019-02-27 17:01:37 INFO disni:55 - IbvRecvWR size match, jverbs size 32, native size 32
2019-02-27 17:01:37 INFO disni:58 - IbvSendWR size mismatch, jverbs size 72, native size 128
2019-02-27 17:01:37 INFO disni:67 - IbvWC size match, jverbs size 48, native size 48
2019-02-27 17:01:37 INFO disni:73 - IbvSge size match, jverbs size 16, native size 16
2019-02-27 17:01:37 INFO disni:80 - Remote addr offset match, jverbs size 40, native size 40
2019-02-27 17:01:37 INFO disni:86 - Rkey offset match, jverbs size 48, native size 48
2019-02-27 17:01:37 INFO disni:61 - createEventChannel, objId 140472690913856
2019-02-27 17:01:37 INFO disni:79 - createId, id 140472690927680
2019-02-27 17:01:37 INFO disni:138 - bindAddr, address /192.168.1.4:0
2019-02-27 17:01:37 INFO RdmaNode:223 - cpuList from configuration file:
2019-02-27 17:01:37 INFO RdmaNode:258 - Empty or failure parsing the cpuList. Defaulting to all available CPUs
2019-02-27 17:01:37 INFO RdmaNode:274 - Using cpuList: [5, 15, 10, 22, 6, 20, 37, 7, 30, 36, 35, 2, 14, 32, 29, 25, 18, 21, 11, 33, 16, 31, 24, 38, 39, 19, 27, 34, 9, 13, 3, 17, 4, 12, 26, 28, 23, 8, 1, 0]
2019-02-27 17:01:37 INFO disni:150 - listen, id 0
2019-02-27 17:01:37 INFO disni:69 - allocPd, objId 140472691160992
2019-02-27 17:01:37 INFO RdmaNode:116 - Starting RdmaNode Listening Server, listening on: /192.168.1.4:57810
If we run the TeraSort which uses shuffle with rdma shuffle manager, the executor will exit unexpectedly. Here is the error log.
2019-02-27 17:01:43 INFO DAGScheduler:54 - Shuffle files lost for executor: 1 (epoch 1)
2019-02-27 17:01:43 INFO YarnAllocator:54 - Completed container container_1547200321055_0731_01_000002 on host: m7-model-inf07 (state: COMPLETE, exit status: 134)
2019-02-27 17:01:43 WARN YarnAllocator:66 - Container marked as failed: container_1547200321055_0731_01_000002 on host: m7-model-inf07. Exit status: 134. Diagnostics: Exception from container-launch.
Container id: container_1547200321055_0731_01_000002
Exit code: 134
Exception message: /bin/bash: line 1: 17944 Aborted LD_LIBRARY_PATH=./libdisni.so::/mnt/disk0/home/work/hadoop/lib/native:/mnt/disk0/home/work/hadoop/lib/native /home/work/jdk/bin/java -server -Xmx8192m -Djava.io.tmpdir=/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=38467' -Dspark.yarn.app.container.log.dir=/mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@m7-model-inf03:38467 --executor-id 1 --hostname m7-model-inf07 --cores 4 --app-id application_1547200321055_0731 --user-class-path file:/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/__app__.jar > /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stdout 2> /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stderr
Stack trace: ExitCodeException exitCode=134: /bin/bash: line 1: 17944 Aborted LD_LIBRARY_PATH=./libdisni.so::/mnt/disk0/home/work/hadoop/lib/native:/mnt/disk0/home/work/hadoop/lib/native /home/work/jdk/bin/java -server -Xmx8192m -Djava.io.tmpdir=/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/tmp '-Dspark.ui.port=0' '-Dspark.driver.port=38467' -Dspark.yarn.app.container.log.dir=/mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@m7-model-inf03:38467 --executor-id 1 --hostname m7-model-inf07 --cores 4 --app-id application_1547200321055_0731 --user-class-path file:/mnt/disk1/nm-local/usercache/work/appcache/application_1547200321055_0731/container_1547200321055_0731_01_000002/__app__.jar > /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stdout 2> /mnt/disk0/home/work/hadoop/logs/userlogs/application_1547200321055_0731/container_1547200321055_0731_01_000002/stderr
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 134
If we try to read the log of Yarn container, it shows that disni call bindAddr
with the actual RDMA IP(192.168.1.9) but call resolveAddr
with the wrong non-RoCE IP(172.27.128.154).
2019-02-27 15:47:37 INFO disni:80 - Remote addr offset match, jverbs size 40, native size 40 2019-02-27 15:47:37 INFO disni:86 - Rkey offset match, jverbs size 48, native size 48
2019-02-27 15:47:37 INFO disni:61 - createEventChannel, objId 140534197564176 2019-02-27 15:47:37 INFO disni:79 - createId, id 140534197563312
2019-02-27 15:47:37 INFO disni:138 - bindAddr, address /192.168.1.9:0
2019-02-27 15:47:37 INFO RdmaNode:223 - cpuList from configuration file:
2019-02-27 15:47:37 INFO RdmaNode:258 - Empty or failure parsing the cpuList. Defaulting to all available CPUs
2019-02-27 15:47:37 INFO RdmaNode:274 - Using cpuList: [18, 8, 22, 17, 31, 3, 5, 34, 24, 37, 20, 25, 13, 16, 28, 21, 36, 26, 9, 33, 10, 6, 14, 1, 27, 39, 4, 38, 15, 19, 7, 35, 32, 12, 30, 29, 2, 23, 11,
0] 2019-02-27 15:47:37 INFO disni:150 - listen, id 0
2019-02-27 15:47:37 INFO disni:69 - allocPd, objId 140534189036912
2019-02-27 15:47:37 INFO RdmaNode:116 - Starting RdmaNode Listening Server, listening on: /192.168.1.9:38440
2019-02-27 15:47:37 INFO disni:61 - createEventChannel, objId 19752576
2019-02-27 15:47:37 INFO disni:79 - createId, id 19753632
2019-02-27 15:47:37 INFO disni:167 - resolveAddr, addres m7-model-inf08/172.27.128.154:37652
2019-02-27 15:47:37 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007fd0649e197b, pid=33656, tid=0x00007fd06a9cf700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_121-b13) (build 1.8.0_121-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.121-b13 mixed mode linux-amd64 compressed oops)
# Problematic frame:
# C [librdmacm.so.1+0x597b]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /mnt/disk3/nm-local/usercache/work/appcache/application_1547200321055_0724/container_1547200321055_0724_02_000006/hs_err_pid33656.log
#
# If you would like to submit a bug report, please visit:
# http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
End of LogType:stdout
Hi,
I was trying to run the terasort app on spark rdma. The terasort code is forked from https://github.com/ehiggs/spark-terasort
I can run the terasort datagen code successfully. However, the program fails when it uses the RDMA utilities.
I checked the spark rdma install guide, but find no clue.
Hi,
I was profling the spark-terasort jobs with SparkRDMA. The spark-terasort code is downloaded from here: https://github.com/fengshenwu/spark-terasort
The above code can run successfully with spark-2.2.0 on 4 compute nodes. However, when using the spark rdma plugin, the job seems able to complete, but hangs at last stage, and never release the rdma buffer. please look at the following pictures:
the spark-defaults.conf of both workers and master are
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.driver.extraClassPath /home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
and I have placed libdisni.so
in /usr/lib
.
When I run TeraGen build from spark-terasort,I can run it with --master spark://master:7077 --deploy-mode client,which whole command is
spark-submit --master spark://master:7077 --deploy-mode client --class com.github.ehiggs.spark.terasort.TeraGen /home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar 1g hdfs://master:9000/data/terasort_in1g
but failed with ClassNotFoundException while using --master spark://master:7077 --deploy-mode cluster,which whole command is
spark-submit --master spark://master:7077 --deploy-mode cluster --class com.github.ehiggs.spark.terasort.TeraGen /home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar 1g hdfs://master:9000/data/terasort_in1g
the error info is
Launch Command: "/home/rui/software/jdk1.8.0_212/bin/java" "-cp" "/home/rui/software/spark-2.4.0-bin-hadoop2.7/conf/:/home/rui/software/spark-2.4.0-bin-hadoop2.7/jars/*" "-Xmx1024M" "-Dspark.executor.extraClassPath=/home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar" "-Dspark.driver.supervise=false" "-Dspark.submit.deployMode=cluster" "-Dspark.master=spark://master:7077" "-Dspark.driver.extraClassPath=/home/rui/data/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar" "-Dspark.jars=file:/home/rui/software/spark-2.4.0-bin-hadoop2.7/spark-terasort/target/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar" "-Dspark.rpc.askTimeout=10s" "-Dspark.app.name=com.github.ehiggs.spark.terasort.TeraGen" "-Dspark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager" "org.apache.spark.deploy.worker.DriverWrapper" "spark://[email protected]:43489" "/home/rui/software/spark-2.4.0-bin-hadoop2.7/work/driver-20190930101138-0006/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar" "com.github.ehiggs.spark.terasort.TeraGen" "5g" "hdfs://master:9000/data/terasort_in5g2"
========================================
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.shuffle.rdma.RdmaShuffleManager
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:259)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
at com.github.ehiggs.spark.terasort.TeraGen$.main(TeraGen.scala:48)
at com.github.ehiggs.spark.terasort.TeraGen.main(TeraGen.scala)
... 6 more
How should I fix it?
What's more, in client deployment , when I generate 50GB data using teragen and use raw spark with no spark-defaults.conf, the transfer speed between master and slave is about 270MB/s. However,when I change my spark-defaults.conf and replace spark.shuffle.manager to org.apache.spark.shuffle.rdma.RdmaShuffleManager, the speed is also 270MB/s.
Is this because I used hdfs storage but it has nothing to do with spark shuffle?
Can you recommed a workload for me to significantly improve completion time when using spark rdma?
Thanks a lot !
Hi
I attached the log for your reference. Please help me to check it, thansks!
Because my servers are ARMv8 64bit server, so I have downloaded the MLNX_OFED_LINUX-4.4-2.0.7.0-rhel7.5alternate-aarch64.tgz and have compiled it and installed it, and used perftest to test it
and make sure the physical RDMA is ok.
I have setup two nodes for hadoop(hadoop-2.7.1) and spark(spark-2.2.0-bin-hadoop2.7 with standalone) on ARMv8 64bit server(Qualcomm ARM server), and used HiBench-7.0 terasort case to validate the SparkRDMA function. Firstly I run it with Spark on yarn mode(Dynamic Resource Allocation), but it failed. And I find you can run it successfully on spark with Standalone mode. So I change to spark Standalone mode.
I building libdisni version 1.7.(git checkout tags/v1.7 -b v1.7).
And configure SparkRDMA as below:
spark.driver.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.driver.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.executor.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
The issue is:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 45.3 in stage 1.0 (TID 241) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 175]
18/11/30 15:03:26 ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled
18/11/30 15:03:26 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 5.189 s due to Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more
Driver stacktrace:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 1.0 (TID 242) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 176]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 17.0 in stage 1.0 (TID 63) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 177]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 1.0 (TID 236) on 192.168.5.136, executor 4: java.lang.reflect.InvocationTargetException (null) [duplicate 178]
18/11/30 15:03:26 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 5.472082 s
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 89.0 in stage 1.0 (TID 135) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 179]
18/11/30 15:03:26 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181130150320_0006.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
I'm seeing the error below when running a spark on 2-nodes (1 master and 2 workers). I'm not a frequent user of Java but any thoughts on why I'd be seeing an initialization error here?
2019-02-27 22:37:36 WARN SparkContext:66 - Using an existing SparkContext; some configuration may not take effect.
2019-02-27 22:38:10 WARN TaskSetManager:66 - Lost task 11.0 in stage 0.0 (TID 11, 10.31.229.69, executor 0): java.lang.NoClassDefFoundError: Co
uld not initialize class com.ibm.disni.rdma.verbs.impl.NativeDispatcher
at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-02-27 22:38:10 WARN TaskSetManager:66 - Lost task 5.0 in stage 0.0 (TID 5, 10.31.229.69, executor 0): java.lang.UnsatisfiedLinkError: no d
isni in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.ibm.disni.rdma.verbs.impl.NativeDispatcher.<clinit>(NativeDispatcher.java:36)
at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.ibm.disni.rdma.verbs.impl.NativeDispatcher
at com.ibm.disni.rdma.verbs.impl.RdmaProviderNat.<init>(RdmaProviderNat.java:43)
at com.ibm.disni.rdma.verbs.RdmaProvider.provider(RdmaProvider.java:58)
at com.ibm.disni.rdma.verbs.RdmaCm.open(RdmaCm.java:49)
at com.ibm.disni.rdma.verbs.RdmaEventChannel.createEventChannel(RdmaEventChannel.java:66)
at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:64)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.startRdmaNodeIfMissing(RdmaShuffleManager.scala:193)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getWriter(RdmaShuffleManager.scala:266)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:98)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Hi
I attached the log for your reference. Please help me to check it, thansks!
Because my servers are ARMv8 64bit server, so I have downloaded the MLNX_OFED_LINUX-4.4-2.0.7.0-rhel7.5alternate-aarch64.tgz and have compiled it and installed it, and used perftest to test it
and make sure the physical RDMA is ok.
I have setup two nodes for hadoop(hadoop-2.7.1) and spark(spark-2.2.0-bin-hadoop2.7 with standalone) on ARMv8 64bit server(Qualcomm ARM server), and used HiBench-7.0 terasort case to validate the SparkRDMA function. Firstly I run it with Spark on yarn mode(Dynamic Resource Allocation), but it failed. And I find you can run it successfully on spark with Standalone mode. So I change to spark Standalone mode.
I building libdisni version 1.7.(git checkout tags/v1.7 -b v1.7), and configure SparkRDMA as below:
spark.driver.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/xianlai/SparkRDMA/SparkRDMA-3.0/target/spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
spark.driver.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.executor.extraJavaOptions -Djava.library.path=/usr/local/lib/
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
The issue log is:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 45.3 in stage 1.0 (TID 241) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 175]
18/11/30 15:03:26 ERROR scheduler.TaskSetManager: Task 45 in stage 1.0 failed 4 times; aborting job
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1
18/11/30 15:03:26 INFO scheduler.TaskSchedulerImpl: Stage 1 was cancelled
18/11/30 15:03:26 INFO scheduler.DAGScheduler: ShuffleMapStage 1 (map at ScalaTeraSort.scala:49) failed in 5.189 s due to Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more
Driver stacktrace:
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 19.3 in stage 1.0 (TID 242) on 192.168.5.136, executor 3: java.lang.reflect.InvocationTargetException (null) [duplicate 176]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 17.0 in stage 1.0 (TID 63) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 177]
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 8.2 in stage 1.0 (TID 236) on 192.168.5.136, executor 4: java.lang.reflect.InvocationTargetException (null) [duplicate 178]
18/11/30 15:03:26 INFO scheduler.DAGScheduler: Job 1 failed: runJob at SparkHadoopMapReduceWriter.scala:88, took 5.472082 s
18/11/30 15:03:26 INFO scheduler.TaskSetManager: Lost task 89.0 in stage 1.0 (TID 135) on 192.168.5.136, executor 2: java.lang.reflect.InvocationTargetException (null) [duplicate 179]
18/11/30 15:03:26 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20181130150320_0006.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 45 in stage 1.0 failed 4 times, most recent failure: Lost task 45.3 in stage 1.0 (TID 241, 192.168.5.136, executor 3): java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:151)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.mapAndRegister(RdmaMappedFile.java:110)
at org.apache.spark.shuffle.rdma.RdmaMappedFile.(RdmaMappedFile.java:88)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleData.writeIndexFileAndCommit(RdmaWrapperShuffleWriter.scala:65)
at org.apache.spark.shuffle.rdma.RdmaShuffleBlockResolver.writeIndexFileAndCommit(RdmaShuffleBlockResolver.scala:64)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.map0(Native Method)
... 18 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
When I run Spark on yarn
Spark2.1.0
Hadoop2.7.3
My spark task is correct
But when my data is big,spark RdmaShuffleManager got error ,please check attch!
Spark rdma conf:
spark.driver.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.executor.extraClassPath /home/bigdata/local/spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
spark.executor.extraLibraryPath /home/bigdata/local/rdma
spark.driver.extraLibraryPath /home/bigdata/local/rdma
logs:
INFO DAGScheduler: Executor lost: 2 (epoch 29)
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:10 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, jx-bd-hadoop523, 35995, None)
19/02/18 10:18:10 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
19/02/18 10:18:10 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 29)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 5 is now unavailable on executor 2 (0/1, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 9 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 2 is now unavailable on executor 2 (0/1000, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 4 is now unavailable on executor 2 (0/2, false)
19/02/18 10:18:10 INFO ShuffleMapStage: ShuffleMapStage 1 is now unavailable on executor 2 (0/500, false)
19/02/18 10:18:12 WARN TransportChannelHandler: Exception in connection from /10.200.20.213:45426
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:275)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 2.
19/02/18 10:18:12 ERROR YarnScheduler: Lost an executor 2 (already removed): Pending loss reason.
19/02/18 10:18:12 INFO BlockManagerMaster: Removal of executor 2 requested
19/02/18 10:18:12 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
19/02/18 10:18:12 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked to remove non-existent executor 2
19/02/18 10:17:42 INFO MemoryStore: Block broadcast_28 stored as values in memory (estimated size 39.9 KB, free 7.0 GB)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:41323)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 0 of size 8192 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 0 took 4 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 19.000389 ms
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 13.038783 ms
19/02/18 10:17:42 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 10.63309 ms
19/02/18 10:17:42 INFO DiskBlockManager: Shutdown hook called
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 12, fetching them
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://[email protected]:41323)
19/02/18 10:17:42 INFO ShutdownHookManager: Shutdown hook called
19/02/18 10:17:42 ERROR Executor: Exception in task 14.0 in stage 9.0 (TID 1518)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 12.0 in stage 9.0 (TID 1517)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 ERROR Executor: Exception in task 8.0 in stage 9.0 (TID 1513)
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:868)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:934)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:248)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:266)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.write(RdmaWrapperShuffleWriter.scala:102)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/02/18 10:17:42 INFO MapOutputTrackerWorker: Got the output locations
19/02/18 10:17:42 INFO RdmaShuffleManager: Getting MapTaskOutput table for shuffleId: 12 of size 16384 from driver
19/02/18 10:17:42 INFO RdmaShuffleManager: RDMA read mapTaskOutput table for shuffleId: 12 took 0 ms
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1520
19/02/18 10:17:42 INFO Executor: Running task 1.0 in stage 3.0 (TID 1520)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1521
19/02/18 10:17:42 INFO Executor: Running task 2.0 in stage 3.0 (TID 1521)
19/02/18 10:17:42 INFO CoarseGrainedExecutorBackend: Got assigned task 1522
19/02/18 10:17:42 INFO Executor: Running task 3.0 in stage 3.0 (TID 1522)
19/02/18 10:17:42 INFO CodeGenerator: Code generated in 23.898935 ms
19/02/18 10:44:51 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on jx-bd-hadoop528:41859 (size: 3.8 KB, free: 7.3 GB)
19/02/18 10:44:51 INFO disni: createCompChannel, context 140379227583536
19/02/18 10:44:51 INFO disni: createCQ, objId 140369536012256, ncqe 4352
19/02/18 10:44:51 INFO disni: createQP, objId 140369536014024, send_wr size 4096, recv_wr_size 256
19/02/18 10:44:51 INFO disni: accept, id 0
19/02/18 10:44:51 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 10.200.20.218:45240
19/02/18 10:44:52 WARN TaskSetManager: Lost task 0.0 in stage 5.2 (TID 8, jx-bd-hadoop528 executor 37): FetchFailed(null, shuffleId=0, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
)
19/02/18 10:44:52 INFO TaskSetManager: Task 0.0 in stage 5.2 (TID 8) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
19/02/18 10:44:52 INFO DAGScheduler: Marking ResultStage 5 (processCmd at CliDriver.java:376) as failed due to a fetch failure from ShuffleMapStage 4 (processCmd at CliDriver.java:376)
19/02/18 10:44:52 INFO YarnScheduler: Removed TaskSet 5.2, whose tasks have all completed, from pool
19/02/18 10:44:52 INFO DAGScheduler: ResultStage 5 (processCmd at CliDriver.java:376) failed in 1.026 s due to org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(37, jx-bd-hadoop528, 41859, None) has no RDMA connection to BlockManagerId(21, jx-bd-hadoop528.zeus, 37576, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
INFO TaskSetManager: Task 134.0 in stage 13.0 (TID 2417) failed, but another instance of the task has already succeeded, so not re-queuing the task to be re-executed.
WARN TaskSetManager: Lost task 33.0 in stage 13.0 (TID 2333, jx-bd-hadoop526, executor 26): FetchFailed(BlockManagerId(48, jx-bd-hadoop530, 42158, None),
shuffleId=14, mapId=0, reduceId=33, message=
org.apache.spark.shuffle.FetchFailedException: RDMA Send/Write/Read WR completed with error: IBV_WC_WR_FLUSH_ERR
We use SparkRDMA
by adding libdisni.so
and spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar
in command of spark-submit. But some tasks always fail because of the issue of java.io.IOException: getCmEvent() failed
. Here is the complete log.
java.io.IOException: getCmEvent() failed
at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:348)
at org.apache.spark.shuffle.rdma.RdmaChannel.connect(RdmaChannel.java:309)
at org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(RdmaNode.java:308)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.org$apache$spark$shuffle$rdma$RdmaShuffleManager$$getRdmaChannel(RdmaShuffleManager.scala:314)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannelToDriver(RdmaShuffleManager.scala:322)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$7.apply(RdmaShuffleManager.scala:349)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$7.apply(RdmaShuffleManager.scala:343)
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getMapTaskOutputTable(RdmaShuffleManager.scala:342)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.startAsyncRemoteFetches(RdmaShuffleFetcherIterator.scala:185)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.initialize(RdmaShuffleFetcherIterator.scala:325)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator.<init>(RdmaShuffleFetcherIterator.scala:85)
at org.apache.spark.shuffle.rdma.RdmaShuffleReader.read(RdmaShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
It seems to require more memory to run SparkRDMA
and it may work by adding --conf spark.executor.memoryOverhead=10g
when using this. We are confused about this and how much memory should we add for this task?
can you give a more detailed configuration method?
Hi all,
I am running Hibench suite terasort against RDMA Spark. My current results show that TCP is performaing better than RDMA. Any thoughts on configs to look into? Happy to post config settings currently in place as well to help diagnose.
We have found some issue about the wiki page in https://github.com/Mellanox/SparkRDMA/wiki/Running-HiBench-with-SparkRDMA which may be fixed easily.
In section "Experiment #1: TeraSort", we need to add not only spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
but also libdisni.so
.
Add to HiBench/conf/spark.conf:
spark.driver.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.executor.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
It should be changed to this.
Add to HiBench/conf/spark.conf:
spark.driver.extraLibraryPath /PATH/TO/libdisni.so
spark.executor.extraLibraryPath /PATH/TO/libdisni.so
spark.driver.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.executor.extraClassPath /PATH/TO/spark-rdma-3.1-for-spark-SPARK_VERSION-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.compress false
spark.shuffle.spill.compress false
spark.broadcast.compress false
spark.broadcast.checksum false
spark.locality.wait 0
SPARK-2.2.0 Hadoop 2.7
HiBench face below issue,
But SPARK DiNSI RDMA basic benchmark able to run.
Please take a look, thanks very much.
Stack: [0x00007fe4b42e3000,0x00007fe4b43e4000], sp=0x00007fe4b43e20c8, free space=1020k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C [libc.so.6+0x147ce5] __memcpy_ssse3_back+0x45
C 0x00000000007e7ea8
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
j com.ibm.disni.rdma.verbs.impl.NativeDispatcher._connect(JJ)I+0
j com.ibm.disni.rdma.verbs.impl.RdmaCmNat.connect(Lcom/ibm/disni/rdma/verbs/RdmaCmId;Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+45
j com.ibm.disni.rdma.verbs.RdmaCmId.connect(Lcom/ibm/disni/rdma/verbs/RdmaConnParam;)I+6
j org.apache.spark.shuffle.rdma.RdmaChannel.connect(Ljava/net/InetSocketAddress;)V+207
j org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(Ljava/net/InetSocketAddress;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+148
j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Ljava/lang/String;IZ)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+20
j org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannel(Lorg/apache/spark/shuffle/rdma/RdmaShuffleManagerId;Z)Lorg/apache/spark/shuffle/rdma/RdmaChannel;+10
j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Lorg/apache/spark/shuffle/rdma/RdmaChannel;+15
j org.apache.spark.shuffle.rdma.RdmaShuffleManager$$anon$1$$anonfun$onSuccess$2.apply()Ljava/lang/Object;+1
j scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1()Lscala/util/Try;+8
j scala.concurrent.impl.Future$PromiseCompletingRunnable.run()V+5
j scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec()Z+4
j scala.concurrent.forkjoin.ForkJoinTask.doExec()I+10
j scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V+10
j scala.concurrent.forkjoin.ForkJoinPool.runWorker(Lscala/concurrent/forkjoin/ForkJoinPool$WorkQueue;)V+11
j scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V+14
v ~StubRoutines::call_stub
Hello!
I found out that SparkRDMA works correctly on our cluster only when 2 worker nodes interacting between each other. The number of executors on each node doesn't matter (I tried 1, 2, 4 executors per node). When 3 or more nodes take part in task processing, some errors occur (logs below). Job ends successfully, but it takes much more time than without RDMA. How can i solve this problem? In case when only 2 nodes take part in task processing I can see performance of SparkRDMA.
4 nodes (all nodes can be workers)
Component | Description |
---|---|
Storage | Samsung SSD 970 EVO Plus 250GB (NVMe) |
OS | Kubuntu 20.04 LTS |
Network switch | Huawei S5720 36-C with switching capacity of 598 Gbit/s |
Network physical layer | single mode optical fiber |
Network adapter | Mellanox ConnectX-4 Lx EN, 10 GbE single port SFP+ |
Memory | 32 GB |
CPU | Intel Core i7-9700 CPU @ 3.00GHz, 8 cores with 1 thread per core |
Configuration | Value |
---|---|
yarn.nodemanager.resource.memory-mb | 27648 |
yarn.scheduler.maximum-allocation-mb | 27648 |
yarn.scheduler.minimum-allocation-mb | 13824 |
Configuration | Value | Description |
---|---|---|
yarn.executor.num | 3 | For this test I use 3 nodes (1 executor per node) |
yarn.executor.cores | 5 | |
spark.executor.memory | 7g | I use only ~50% of possible memory to prevent OOM errors (very often it can be) |
spark.executor.memoryOverhead | 8g | as suggested here |
spark.driver.memory | 2g | |
spark.default.parallelism | 45 | |
spark.sql.shuffle.partitions | 45 | |
spark.shuffle.manager | org.apache.spark.shuffle.rdma.RdmaShuffleManager | |
spark.shuffle.compress | false | as suggested here |
spark.shuffle.spill.compress | false | |
spark.broadcast.compress | false | |
spark.locality.wait | 0 |
2020-11-24 12:00:02 INFO YarnClientSchedulerBackend:54 - Stopped
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
2020-11-24 12:00:02 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(2)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(1)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:345)
at org.apache.spark.shuffle.rdma.RdmaChannel.stop(RdmaChannel.java:894)
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:203)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RdmaNode connection listening thread" java.lang.RuntimeException: Exception in RdmaNode listening thread java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:210)
at java.lang.Thread.run(Thread.java:748)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO RdmaBufferManager:218 - Rdma buffers allocation statistics:
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 770 buffers of size 4 KB
2020-11-24 12:00:02 INFO disni:201 - deallocPd, pd 1
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO MemoryStore:54 - MemoryStore cleared
2020-11-24 12:00:02 INFO BlockManager:54 - BlockManager stopped
2020-11-24 12:00:02 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2020-11-24 12:00:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2020-11-24 12:00:02 INFO SparkContext:54 - Successfully stopped SparkContext
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Shutdown hook called
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-3a2b9379-56c3-4b40-ab40-e92f03a5c591
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-9e3d837b-5d83-467f-9a12-591ffd0503a8
2020-11-24 11:58:51 INFO Executor:54 - Finished task 40.0 in stage 2.0 (TID 355). 1502 bytes result sent to driver
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver commanded a shutdown
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(1)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(2)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(0)
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(3)
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 140630589705088
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206395344
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver from hadoop-master:45735 disconnected during shutdown
2020-11-24 12:00:02 INFO CoarseGrainedExecutorBackend:54 - Driver from hadoop-master:45735 disconnected during shutdown
2020-11-24 12:00:02 INFO RdmaChannel:874 - Stopping RdmaChannel RdmaChannel(4)
2020-11-24 12:00:02 ERROR RdmaNode:384 - Failed to stop RdmaChannel during 50 ms
2020-11-24 12:00:02 INFO disni:257 - disconnect, id 0
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 WARN RdmaChannel:897 - Failed to get RDMA_CM_EVENT_DISCONNECTED: getCmEvent() failed
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206905120
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 94745206402048
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 WARN RdmaChannel:897 - Failed to get RDMA_CM_EVENT_DISCONNECTED: getCmEvent() failed
2020-11-24 12:00:02 INFO disni:285 - destroyQP, id 0
2020-11-24 12:00:02 INFO disni:214 - destroyCQ, cq 140630593575520
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:189 - destroyCompChannel, compChannel 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO RdmaNode:213 - Exiting RdmaNode Listening Server
2020-11-24 12:00:02 INFO RdmaBufferManager:218 - Rdma buffers allocation statistics:
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 380 buffers of size 4 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 87 buffers of size 4096 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 24 buffers of size 1024 KB
2020-11-24 12:00:02 INFO RdmaBufferManager:222 - Pre allocated 0, allocated 10 buffers of size 2048 KB
2020-11-24 12:00:02 INFO disni:201 - deallocPd, pd 1
2020-11-24 12:00:02 INFO disni:274 - destroyCmId, id 0
2020-11-24 12:00:02 INFO disni:263 - destroyEventChannel, channel 0
2020-11-24 12:00:02 INFO MemoryStore:54 - MemoryStore cleared
2020-11-24 12:00:02 INFO BlockManager:54 - BlockManager stopped
2020-11-24 12:00:02 INFO ShutdownHookManager:54 - Shutdown hook called
Hello,
I am testing the SparkRDMA with Mellanox ConnectX-4Lx card. I installed the Spark-2.2.0 and download SparkTeraSort sample code. The sparkterasort sample code can ran successfully with spark-2.2.0, however, when run the terasort code with the SparkRDMA plugin, it throws out error which is show as following picture.
Do I need upgrade libibverb.so or do I need configure the RDMA network for Spark? Please help.
Hi,
I am currently evaluating this library and not having done any specific configuration with respect to the infiniband network the spark nodes interconnect on. Can you point me in right direction on what might be cause of issue. See config and stacktrace below.
spark2-submit -v --num-executors 10 --executor-cores 5 --executor-memory 4G --conf spark.driver.extraClassPath=/opt/mellanox/spark-rdma-3.1.jar --conf spark.executor.extraClassPath=/opt/mellanox/spark-rdma-3.1.jar --conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager --class com.github.ehiggs.spark.terasort.TeraSort /tmp/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar /tmp/data/terasort_in /tmp/data/terasort_out
Parsed arguments:
master yarn
deployMode client
executorMemory 4G
executorCores 5
totalExecutorCores null
propertiesFile /opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/conf/spark-defaults.conf
driverMemory null
driverCores null
driverExtraClassPath /opt/mellanox/spark-rdma-3.1.jar
driverExtraLibraryPath /opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native
driverExtraJavaOptions null
supervise false
queue null
numExecutors 10
files null
pyFiles null
archives null
mainClass com.github.ehiggs.spark.terasort.TeraSort
primaryResource file:/tmp/spark-terasort-1.1-SNAPSHOT-jar-with-dependencies.jar
name com.github.ehiggs.spark.terasort.TeraSort
childArgs [/tmp/data/terasort_in /tmp/data/terasort_out]
jars null
packages null
packagesExclusions null
repositories null
(spark.shuffle.manager,org.apache.spark.shuffle.rdma.RdmaShuffleManager)
(spark.executor.extraLibraryPath,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native)
(spark.authenticate,false)
(spark.yarn.jars,local:/opt/cloudera/parcels/SPARK2-2.2.0.cloudera1-1.cdh5.12.0.p0.142354/lib/spark2/jars/)
(spark.driver.extraLibraryPath,/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/lib/native)
(spark.yarn.historyServer.address,
19/05/02 13:54:17 WARN spark.SparkContext: Using an existing SparkContext; some configuration may not take effect.
[Stage 0:> (0 + 17) / 45]19/05/02 13:54:18 ERROR rdma.RdmaNode: Error in accept call on a passive RdmaChannel: java.io.IOException: createCQ() failed
java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaChannel.processRdmaCmEvent(RdmaChannel.java:345)
at org.apache.spark.shuffle.rdma.RdmaChannel.stop(RdmaChannel.java:894)
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:176)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "RdmaNode connection listening thread" java.lang.RuntimeException: Exception in RdmaNode listening thread java.lang.NullPointerException
at org.apache.spark.shuffle.rdma.RdmaNode.lambda$new$0(RdmaNode.java:210)
at java.lang.Thread.run(Thread.java:748)
19/05/02 13:54:20 WARN scheduler.TaskSetManager: Lost task 9.0 in stage 0.0 (TID 3, , executor 3): java.lang.ArithmeticException: / by zero
at org.apache.spark.shuffle.rdma.RdmaNode.getNextCpuVector(RdmaNode.java:278)
at org.apache.spark.shuffle.rdma.RdmaNode.getRdmaChannel(RdmaNode.java:301)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.org$apache$spark$shuffle$rdma$RdmaShuffleManager$$getRdmaChannel(RdmaShuffleManager.scala:314)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.getRdmaChannelToDriver(RdmaShuffleManager.scala:322)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.publishMapTaskOutput(RdmaShuffleManager.scala:410)
at org.apache.spark.shuffle.rdma.writer.wrapper.RdmaWrapperShuffleWriter.stop(RdmaWrapperShuffleWriter.scala:118)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:97)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
We have setup RDMA environment and run the Spark jobs with RDMA Shuffle Manager. Here is the Spark command to submit the job.
${SPARK_HOME}/bin/spark-submit \
--executor-memory 7200m \
--master yarn \
--num-executors 40 \
--files /root/spark-benchmark/tools/SparkRDMA/libdisni.so,/root/spark-benchmark/tools/SparkRDMA/spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
--executor-cores 1 \
--deploy-mode cluster \
--driver-memory 4g \
--conf spark.eventLog.dir=hdfs:///spark_benchmark/event_log/2.3.0 \
--conf spark.yarn.jars=hdfs:///spark_benchmark/dependency/spark230_jars/*.jar \
--conf spark.shuffle.spill.compress=false \
--conf spark.executor.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
--conf spark.shuffle.manager=org.apache.spark.shuffle.rdma.RdmaShuffleManager \
--conf spark.driver.extraLibraryPath=./ \
--conf spark.hadoop.yarn.timeline-service.enabled=false \
--conf spark.eventLog.enabled=true \
--conf spark.driver.extraClassPath=spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar \
--conf spark.executor.extraLibraryPath=./ \
--conf spark.executor.memoryOverhead=128g \
--conf spark.shuffle.compress=true \
--conf spark.sql.shuffle.partitions=340 \
--conf spark.task.maxFailures=1 \
--conf spark.yarn.maxAppAttempts=1 \
--class com.example.SparkApp
This script may work at times. But sometimes it may cause the fail task although the task may re-schedule and complete finally. Here is the error log of the fail task. Since we have set spark.shuffle.compress=true
and the default compressor Lz4Codec will throw exception of Stream is corrupted
.
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:202)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.<init>(UnsafeRowSerializer.scala:120)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.asKeyValueIterator(UnsafeRowSerializer.scala:110)
at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:68)
at org.apache.spark.shuffle.rdma.RdmaShuffleReader$$anonfun$4.apply(RdmaShuffleReader.scala:64)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
... 8 more
If we set spark.shuffle.compress=false
, the error will be throw by readFully
when try to unserialized the steam to row object.
Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 223, lnode6.leap.com, executor 1): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: reached end of stream after reading 723048 bytes; 825504310 bytes expected
at org.spark_project.guava.io.ByteStreams.readFully(ByteStreams.java:735)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:127)
at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:444)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:30)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:125)
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
... 8 more
The SparkApp is simple. We read the data from HDFS and do some transformation before calling saveAsTextFile
to save the data in HDFS. The fail task always happen in the final stage.
By the way, this issue may be not easy to reproduce. But in our environment, we have use the custom codec to save the RDD which may raise the probability about this issue. The codec can not get the stream or break the stream before RDMA Shuffle Manager doing the shuffle read, so we still have no clue about the root cause of this issue.
We have compare the performance of using Spark shuffle manager and pure TCP. The speed-up is 30% at most but not 2.63 times from the README.md.
Can the tester of this project provide more specified parameters to re-produce this performance? For example, how many partition should we use and should we restrict the executor memory or executor number?
We have used Spark RDMA shuffle manager to replace the default shuffle manager. But it throw MetadataFetchFailedException
. Here is the error log.
19/05/22 10:23:31 INFO disni: createCompChannel, context 139795900960400
19/05/22 10:23:31 INFO disni: createCQ, objId 139794752991072, ncqe 1
19/05/22 10:23:31 INFO disni: createQP, objId 139796347838856, send_wr size 0, recv_wr_size 0
19/05/22 10:23:31 INFO disni: accept, id 0
19/05/22 10:23:31 INFO ProphetInstanceStatistics: create OutputStream for part 198, output file is :hdfs:///prophet/autoui/360leap-ptest/workspace/test/pws/1/FeatureExtract/455a1e51160e4798bf13b18f50ad7d09/36/c5bbce26-01b2-4731-af13-871bb635c751/0/1558491641346/signValueMap/part-198-_w0
19/05/22 10:23:31 ERROR FeSpark$$anon$1: Failed to process 198
19/05/22 10:23:31 ERROR FeSpark$$anon$1: processor failed
org.apache.spark.shuffle.MetadataFetchFailedException: RdmaShuffleNode: BlockManagerId(138, node3.leap.com, 36778, None) has no RDMA connection to BlockManagerId(36, node5.leap.com, 41465, None)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:214)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1$$anonfun$apply$4.apply(RdmaShuffleFetcherIterator.scala:203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:203)
at org.apache.spark.shuffle.rdma.RdmaShuffleFetcherIterator$$anonfun$startAsyncRemoteFetches$1.apply(RdmaShuffleFetcherIterator.scala:185)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
19/05/22 10:23:31 INFO RdmaNode: Established connection to /172.27.193.1:60672 in 983 ms
19/05/22 10:23:31 ERROR Executor: Exception in task 198.0 in stage 4.0 (TID 774)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
at org.apache.spark.rdd.RDD$$anonfun$map$1$$anonfun$apply$5.apply(RDD.scala:372)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
19/05/22 10:23:31 INFO CoarseGrainedExecutorBackend: Got assigned task 897
19/05/22 10:23:31 INFO Executor: Running task 321.0 in stage 4.0 (TID 897)
19/05/22 10:23:31 INFO Executor: Executor is trying to kill task 321.0 in stage 4.0 (TID 897), reason: Stage cancelled
It seems that the RDMA connection is not established but it will work with other SparkSQL jobs. We are not sure if it may be the bug of SparkRDMA
.
We have add rdma shuffle manager from SparkRDMA for Spark applications and submit the jobs in yarn-cluster mode.
Just like #5 , we got the error of "Fail to bind the port". We have check the used IP which is not the excepted one of RoCE NIC. But thee is no way to setup this IP in Spark workers.
We have try adding LOCAL_SPARK_IP
in spark-env.sh
but it only works for client mode while we are using yarn-cluster mode. I'm not sure if it works by setting the server's hostname or Yarn nodemanger address with this RoCE NIC IP.
Hi,
I'm trying to submit a SparkPi example using SparkRDMA. I have the following in my spark-default.conf
spark.driver.extraClassPath /path/to/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.executor.extraClassPath /path/to/spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
spark.shuffle.manager org.apache.spark.shuffle.rdma.RdmaShuffleManager
spark.shuffle.rdma.driverPort 3037
spark.shuffle.rdma.executorPort 4037
spark.shuffle.rdma.portMaxRetries 100
but I see the following errors:
./bin/spark-submit --deploy-mode client --master spark://dgx03:7077 --class org.apache.spark.examples.SparkPi ./examples/target/scala-2.11/jars/spark-examples_2.11-2.4.0.jar
...
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/02/15 12:12:35 INFO SparkContext: Running Spark version 2.4.0
19/02/15 12:12:35 INFO SparkContext: Submitted application: Spark Pi
19/02/15 12:12:35 INFO SecurityManager: Changing view acls to: akven
19/02/15 12:12:35 INFO SecurityManager: Changing modify acls to: akven
19/02/15 12:12:35 INFO SecurityManager: Changing view acls groups to:
19/02/15 12:12:35 INFO SecurityManager: Changing modify acls groups to:
19/02/15 12:12:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with vie
w permissions: Set(akven); groups with view permissions: Set(); users with modify permissions: Set(akven); groups with modify permissions: Set()
19/02/15 12:12:36 INFO Utils: Successfully started service 'sparkDriver' on port 36411.
19/02/15 12:12:36 INFO SparkEnv: Registering MapOutputTracker
19/02/15 12:12:36 INFO disni: creating RdmaProvider of type 'nat'
19/02/15 12:12:36 INFO disni: jverbs jni version 32
19/02/15 12:12:36 INFO disni: sock_addr_in size mismatch, jverbs size 28, native size 16
19/02/15 12:12:36 INFO disni: IbvRecvWR size match, jverbs size 32, native size 32
19/02/15 12:12:36 INFO disni: IbvSendWR size mismatch, jverbs size 72, native size 128
19/02/15 12:12:36 INFO disni: IbvWC size match, jverbs size 48, native size 48
19/02/15 12:12:36 INFO disni: IbvSge size match, jverbs size 16, native size 16
19/02/15 12:12:36 INFO disni: Remote addr offset match, jverbs size 40, native size 40
19/02/15 12:12:36 INFO disni: Rkey offset match, jverbs size 48, native size 48
19/02/15 12:12:36 INFO disni: createEventChannel, objId 47303849051008
19/02/15 12:12:36 INFO disni: createId, id 47303849066960
19/02/15 12:12:36 INFO disni: bindAddr, address dgx03/A.B.C.D:3037
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3037 on iteration 0
19/02/15 12:12:36 INFO disni: bindAddr, address dgx03/A.B.C.D:3038
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3038 on iteration 1
...
19/02/15 12:12:36 INFO RdmaNode: Failed to bind to port 3052 on iteration 15
19/02/15 12:12:36 ERROR RdmaNode: Failed in RdmaNode constructor
^ This seems to be the main error
19/02/15 12:12:36 INFO disni: destroyCmId, id 0
19/02/15 12:12:36 INFO disni: destroyEventChannel, channel 0
19/02/15 12:12:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:264)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Failed to bind. Make sure your NIC supports RDMA
at org.apache.spark.shuffle.rdma.RdmaNode.<init>(RdmaNode.java:87)
at org.apache.spark.shuffle.rdma.RdmaShuffleManager.<init>(RdmaShuffleManager.scala:137)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:264)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:323)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:175)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:257)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:424)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:935)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:926)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:926)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:31)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Any thoughts on what I need to change to fix this? Thanks in advance.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.