Hi dataproc team.
tl;dr
Hope this title isn't too bombastic, but it seems dataproc cannot support PySpark workloads in Python version 3.3 and greater. This stems from PySpark checking for a PYTHONHASHSEED
env var that, while set, is not detected during execution of spark jobs on a Dataproc cluster. This occurs with other environment variables in different runtimes as well (e.g., remote spark job submittal with a property setting for PYSPARK_PYTHON
, see below).
0. details
PySpark's rdd.py
module requires a set PYTHONHASHSEED env var for Python 3.3 and higher, ensuring a common seed among workloads distributed across multiple Python instances.
While there is a discussion of whether or not PySpark should set PYTHONHASHSEED on its own, ultimately, PySpark workloads and applications based on Python 3.3 and higher must have PYTHONHASHSEED
set. This can be done by setting exports in global profiles or more concisely, in the spark-env.sh
config. Some examples of this:
1. Setup
To demonstrate, I've implemented a basic shell script that also installs Python 3, like this:
❯❯ wget https://gist.githubusercontent.com/nehalecky/9258c01fb2077f51545a/raw/789f08141dc681cf1ad5da05455c2cd01d1649e8/install-py3-dataproc.sh
❯❯ cat install-py3-dataproc.sh
#!/bin/bash
apt-get -y install python3
echo "export PYSPARK_PYTHON=python3" | tee -a /etc/profile.d/spark_config.sh /etc/*bashrc
echo "export PYTHONHASHSEED=123" | tee -a /etc/profile.d/spark_config.sh /etc/*bashrc /usr/lib/spark/conf/spark-env.sh
source ~/.bashrc
and target this in as an init action in launching a minimal dataproc cluster
❯❯ gcloud beta dataproc clusters create py3-test
--initialization-actions \
gs://bombora-dev-analytics/dataproc-init-actions/install-py3-dataproc.sh #public object
Logging in, one can confirm that PYTHONHASHSEED
env var is set across both global profiles along with spark-env.sh
config, across both master and workers nodes.
master
❯❯ gcloud compute ssh root@py3-test-m
root@spark-cluster-m:~# echo $PYTHONHASHSEED
123
root@py3-test-m:~# echo $PYSPARK_PYTHON
python3
worker
❯❯ gcloud compute ssh root@py3-test-w-0
root@py3-test-w-0:~# echo $PYTHONHASHSEED
123
root@py3-test-w-0:~# echo $PYSPARK_PYTHON
python3
2. Testing
To test, I have a simple python script that outputs info on PYTHONHASHSEED
, raises an exception if not detected, and attempts a pyspark job to printout the python version detected by executors.
root@spark-cluster-m:~# wget https://raw.githubusercontent.com/nehalecky/dataproc-initialization-actions/feature/conda_init_action/develop/conda/get-sys-exec.py
root@spark-cluster-m:~# cat get-sys-exec.py
import pyspark
import sys
import os
pyhashseed = os.environ['PYTHONHASHSEED']
print(pyhashseed)
print(type(pyhashseed))
print(sys.version)
if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
else:
sc = pyspark.SparkContext()
distData = sc.parallelize(range(100))
python_distros = distData.map(lambda x: sys.executable).distinct().collect()
print(python_distros)
Local
I run this locally on the cluster, with a call to spark-submit
. While able to read and print out the correct setting for PYTHONHASHSEED
, it later hits an exception raised by the exact same logic that it initially passed! Seems that the env var is detected on the master node fine, as the exception is raised on the worker node. Crazy.
root@spark-cluster-m:~# spark-submit get-sys-exec.py
123
<class 'str'>
3.5.1 |Continuum Analytics, Inc.| (default, Dec 7 2015, 11:16:01)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]
16/01/18 01:18:03 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/01/18 01:18:03 INFO Remoting: Starting remoting
16/01/18 01:18:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:47010]
16/01/18 01:18:04 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 01:18:04 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:60173
16/01/18 01:18:04 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 01:18:04 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:4040
16/01/18 01:18:04 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/01/18 01:18:04 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at spark-cluster-m/10.240.0.2:8032
16/01/18 01:18:06 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1453072376140_0005
[Stage 0:> (0 + 2) / 2]16/01/18 01:18:16 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, spark-cluster-w-0.c.bombora-dev.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1453072376140_0005/container_1453072376140_0005_01_000002/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1453072376140_0005/container_1453072376140_0005_01_000002/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1453072376140_0005/container_1453072376140_0005_01_000002/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1704, in add_shuffle_key
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1453072376140_0005/container_1453072376140_0005_01_000002/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash
raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Remote
Remote execution completes because, regardless of any setting of PYSPARK_PYTHON
env var, it uses the default Python installation (2.7.9). Again, this is likely due to not being able to reference the globally set env vars.
>> gcloud beta dataproc jobs submit pyspark --cluster py3-test get-sys-exec.py
Copying file://get-sys-exec.py [Content-Type=text/x-python]...
Uploading ...33ab-c06f-4234-b495-92c3bf9ac6e0/get-sys-exec.py: 480 B/480 B
Job [6512a774-fc83-4d2c-b735-f40fa7bac534] submitted.
Waiting for job output...
123
<type 'str'>
2.7.9 (default, Mar 1 2015, 12:57:24)
[GCC 4.9.2]
16/01/18 19:04:30 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/01/18 19:04:30 INFO Remoting: Starting remoting
16/01/18 19:04:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:43622]
16/01/18 19:04:30 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 19:04:30 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:52135
16/01/18 19:04:31 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 19:04:31 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:4040
16/01/18 19:04:31 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/01/18 19:04:31 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at py3-test-m/10.240.0.3:8032
16/01/18 19:04:33 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1453141822766_0002
['/usr/bin/python']
16/01/18 19:04:43 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/18 19:04:43 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
Job [6512a774-fc83-4d2c-b735-f40fa7bac534] finished successfully.
driverControlFilesUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-metainfo/44b233ab-c06f-4234-b495-92c3bf9ac6e0/jobs/6512a774-fc83-4d2c-b735-f40fa7bac534/
driverOutputResourceUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-metainfo/44b233ab-c06f-4234-b495-92c3bf9ac6e0/jobs/6512a774-fc83-4d2c-b735-f40fa7bac534/driveroutput
placement:
clusterName: py3-test
clusterUuid: 44b233ab-c06f-4234-b495-92c3bf9ac6e0
pysparkJob:
loggingConfiguration: {}
mainPythonFileUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-staging/44b233ab-c06f-4234-b495-92c3bf9ac6e0/get-sys-exec.py
reference:
jobId: 6512a774-fc83-4d2c-b735-f40fa7bac534
projectId: bombora-dev
status:
state: DONE
stateStartTime: '2016-01-18T19:04:50.779Z'
statusHistory:
- state: PENDING
stateStartTime: '2016-01-18T19:04:24.164Z'
- state: SETUP_DONE
stateStartTime: '2016-01-18T19:04:24.320Z'
- details: Agent reported job success
state: RUNNING
stateStartTime: '2016-01-18T19:04:29.613Z'
Attempt at passing in a --properties
argument has no impact (and note the warning for ignoring PYSPARK_PYTHON
).
gcloud beta dataproc jobs submit pyspark --cluster py3-test --properties PYSPARK_PYTHON=/usr/bin/python3 get-sys-exec.py ✱ ◼
Copying file://get-sys-exec.py [Content-Type=text/x-python]...
Uploading ...33ab-c06f-4234-b495-92c3bf9ac6e0/get-sys-exec.py: 480 B/480 B
Job [2ba2a5b3-3e4a-41ce-b463-6970185b03bf] submitted.
Waiting for job output...
Warning: Ignoring non-spark config property: PYSPARK_PYTHON=/usr/bin/python3
123
<type 'str'>
2.7.9 (default, Mar 1 2015, 12:57:24)
[GCC 4.9.2]
16/01/18 19:33:25 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
16/01/18 19:33:25 INFO Remoting: Starting remoting
16/01/18 19:33:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:56989]
16/01/18 19:33:26 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 19:33:26 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:44267
16/01/18 19:33:26 INFO org.spark-project.jetty.server.Server: jetty-8.y.z-SNAPSHOT
16/01/18 19:33:26 INFO org.spark-project.jetty.server.AbstractConnector: Started [email protected]:4040
16/01/18 19:33:26 WARN org.apache.spark.metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/01/18 19:33:26 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at py3-test-m/10.240.0.3:8032
16/01/18 19:33:28 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1453141822766_0005
['/usr/bin/python']
Job [2ba2a5b3-3e4a-41ce-b463-6970185b03bf] finished successfully.
driverControlFilesUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-metainfo/44b233ab-c06f-4234-b495-92c3bf9ac6e0/jobs/2ba2a5b3-3e4a-41ce-b463-6970185b03bf/
driverOutputResourceUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-metainfo/44b233ab-c06f-4234-b495-92c3bf9ac6e0/jobs/2ba2a5b3-3e4a-41ce-b463-6970185b03bf/driveroutput
placement:
clusterName: py3-test
clusterUuid: 44b233ab-c06f-4234-b495-92c3bf9ac6e0
pysparkJob:
loggingConfiguration: {}
mainPythonFileUri: gs://dataproc-e66c6e3d-80da-4211-b66c-0109bbe4567a-us/google-cloud-dataproc-staging/44b233ab-c06f-4234-b495-92c3bf9ac6e0/get-sys-exec.py
properties:
PYSPARK_PYTHON: /usr/bin/python3
reference:
jobId: 2ba2a5b3-3e4a-41ce-b463-6970185b03bf
projectId: bombora-dev
status:
state: DONE
stateStartTime: '2016-01-18T19:33:45.155Z'
statusHistory:
- state: PENDING
stateStartTime: '2016-01-18T19:33:20.791Z'
- state: SETUP_DONE
stateStartTime: '2016-01-18T19:33:20.947Z'
- details: Agent reported job success
state: RUNNING
stateStartTime: '2016-01-18T19:33:27.724Z'
3. Hacking
In one last desperate attempt to get things working, I modified the /usr/lib/spark/python/pyspark/rdd.py
module to statically define the env var by changing line 74 to this:
os.environ['PYTHONHASHSEED'] = 0
warnings.warn('Environment variable PYTHONHASHSEED not detected, set to 0')
#raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
This, however, had no effect, which was totally unexpected until I realized that code base wasn't being called. Instead, the traceback references a different directory at: /usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py
, which indeed I had not modified.
4. Fin
I've exhausted all leads on my side, wanted to hand this off, and hope this helps in identifying and resolving this issue. Appreciate your time and please let me know how more I can help.
Many thanks. :)