Code Monkey home page Code Monkey logo

pyspark-cassandra's People

Contributors

anguenot avatar dalonso82 avatar frensjan avatar gfronza avatar mdinesh9 avatar msukmanowsky avatar ndamclean avatar nivye avatar philipppahl avatar sathiyarajanm avatar thesebas avatar vanyadndz avatar vntzy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

pyspark-cassandra's Issues

Unable to select cql date type

using CQL

CREATE TABLE temp (id int PRIMARY KEY , date date);
INSERT INTO temp (id, date) VALUES (2, '2017-12-01');
INSERT INTO temp (id, date) VALUES (3, '2017-12-02');
INSERT INTO temp (id, date) VALUES (4, '2017-12-03');
INSERT INTO temp (id, date) VALUES (5, '2017-12-04');
INSERT INTO temp (id, date) VALUES (6, '2017-12-05');
INSERT INTO temp (id, date) VALUES (7, '2017-12-06');
INSERT INTO temp (id, date) VALUES (8, '2017-12-04');

In pyspark using pyspark --packages anguenot/pyspark-cassandra:0.6.0

d = sc.cassandraTable('mykeyspace', 'tmp') \
  .select('id', 'date') \
  .where('date=?', '2017-12-01') \
  .collect()

Crashes with a long error, here's head of it

17/12/12 19:21:38 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 16)
net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments
        at net.razorvine.pickle.Pickler.put_javabean(Pickler.java:727)
        at net.razorvine.pickle.Pickler.dispatch(Pickler.java:328)
        at net.razorvine.pickle.Pickler.save(Pickler.java:141)

I also noticed date type does not appear in the list of CQL types in the readme, does that mean it is not supported ?

Timestamp fields are automatically transformed to local timezone

When loading a table from pyspark using pyspark-cassandra with the following code

df_test = spark\
    .read\
    .format("org.apache.spark.sql.cassandra")\
    .option("keyspace", conf.keyspace)\
    .option("table", conf.table)\
    .load()

it automatically converts all timestamp fields to local timezone.

For example, although selecting the entry from through cqlsh the timestamp remains

2019-05-02 07:57:32.263000+0000

running the above code from Greece (+03:00) will lead to timestamp:

2019-05-02 10:57:32.263000+0000

running the above code from Spain (+02:00) will lead to timestamp:

2019-05-02 09:57:32.263000+0000

Is there any whay to avoid such behaviour? Or I have to manually get the local timezone and add/subtract from the downloaded timestamp?

Can't connect to multiple cassandra clusters

Hi,

I am not able to connect to multiple cassandra clusters through this package even on passing the host parameters in conf as conf.set('spark.cassandra.connection.host','192.168.1.xx,192.168.1.yy')

Specifying multiple clusters seems to be available in the original spark-cassandra-connector package. Can you tell me how to use it with this?

Thank You

Support for version 3?

Datastax is already working on a version 3 for spark 3.

Are you going to release a new version?

Thanks for the great work done here! 😄

Regards.

Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.writer.RowWriterFactory

My code is :

from pyspark_cassandra import CassandraSparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark import SparkContext


conf = SparkConf() \
	.setAppName("PySpark Cassandra Test") \
	.setMaster("spark://192.192.141.21:7077") \
	.set("spark.cassandra.connection.host", "192.192.141.26:9042")

sc = CassandraSparkContext(conf=conf)
# sc = SparkContext(conf=conf)
# sqlContext = SQLContext(sc)
#sc = SQLContext(conf=conf)
dd = sc.cassandraTable("oltpdb", "XiangWan")\
	.select("dt", "wid") \
	.where("wid='XiangWan001'") \
	.collect()

And the command line is
spark-submit --jars /root/model/pyspark-cassandra-0.10.1.jar /root/model/connect_cannandra_via_spark.py

And I got the error:

19/04/12 14:14:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/04/12 14:14:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/12 14:14:56 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
19/04/12 14:14:56 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
Traceback (most recent call last):
  File "/root/model/connect_cannandra_via_spark.py", line 17, in <module>
    dd = sc.cassandraTable("oltpdb", "XiangWan")\
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/context.py", line 33, in cassandraTable
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/rdd.py", line 324, in __init__
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/rdd.py", line 213, in _helper
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/util.py", line 99, in helper
  File "/root/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/root/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o26.newInstance.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/writer/RowWriterFactory
        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
        at java.lang.Class.getConstructor0(Class.java:3075)
        at java.lang.Class.newInstance(Class.java:412)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.writer.RowWriterFactory
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 15 more

So what should I do?

Usage as maven dependency

Hi !

What i'm trying to do
Add pyspark-cassandra in the pom.xml of Spark 2.2..
meaning running mvn install so I've added to the pom :

<dependency>
        <groupId>anguenot</groupId>
        <artifactId>pyspark-cassandra</artifactId>
        <version>0.5.0</version>
</dependency>

What I know
I know that example you provide shows that the package is installed via

spark-submit \
	--packages anguenot/pyspark-cassandra:<version> \
	--conf spark.cassandra.connection.host=your,cassandra,node,names

or

spark-submit \
	--jars /path/to/pyspark-cassandra-assembly-<version>.jar \
	--py-files /path/to/pyspark-cassandra-assembly-<version>.jar \
	--conf spark.cassandra.connection.host=your,cassandra,node,names \
	--master spark://spark-master:7077 \
	yourscript.py

The question

I get the following error :

Failed to execute goal on project POC_Crawler_Worker: Could not resolve dependencies for project fr.blowlitik:POC_Crawler_Worker:jar:1.0-SNAPSHOT: Failure to find anguenot:pyspark-cassandra:jar:0.5.0 in http://maven.cm-cloud.fr/artifactory/libs-release was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1]

I just want to understand if the failure is normal or if we missed something.

Reproduce
Simply add

<dependency>
     <groupId>anguenot</groupId>
     <artifactId>pyspark-cassandra</artifactId>
      <version>0.5.0</version>
</dependency>

to any pom.xml you have and try to mvn install

Thank you :) !

Random failures on .saveToCassandra()

Hi all,

Before anything, thanks for your efforts on this library. We've been using it for ~2.5 years and works like a charm.

Recently we've started to migrate the entire stack to Python3, Cassandra 3.11, Spark 2.4. I recompiled the library with the proper modifications to the building script and it works fine. But sometimes I get an error on .saveToCassandra(), I suspect. And I say I suspect, because I've really checked everywhere and cannot find where the error might be. Sometimes it works, sometimes it doesn't.

Code is as follows:

def process(dateToQuery, sc, field_pos):

    buckets = [(dateToQuery, i) for i in range(64)]
    bucketlist = sc.parallelize(buckets)
    query = bucketlist\
        .joinWithCassandraTable(opts.get('keyspace'), opts.get('srctable'))\
        .select(*select)\
        .map(lambda r: mapper(r[1], field_pos))\
        .reduceByKey(reducer)\
        .map(flattener)\
        .saveToCassandra(opts.get('keyspace'), opts.get('dsttable'))

Nothing particularly interesting there.

The error I get sometimes is:

19/07/28 00:30:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/07/28 00:30:02 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).

    doing 2019-07-27 16:00:00

    done 2019-07-27 16:00:00

    doing 2019-07-27 17:00:00
19/07/28 00:30:21 WARN TaskSetManager: Lost task 110.0 in stage 2.0 (TID 114, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$Leading$1.hasNext(Iterator.scala:668)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

After getting some of those, sometimes the process finishes successfully. Some other times not. When it doesn't, this is the output:

19/07/28 00:30:21 WARN TaskSetManager: Lost task 110.0 in stage 2.0 (TID 114, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for
 construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$Leading$1.hasNext(Iterator.scala:668)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
        at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
        at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:557)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

19/07/28 00:30:23 WARN TaskSetManager: Lost task 3.0 in stage 3.0 (TID 135, 10.103.96.104, executor 58): net.razorvine.pickle.PickleException: expected zero arguments for c
onstruction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

19/07/28 00:30:23 ERROR TaskSetManager: Task 52 in stage 3.0 failed 4 times; aborting job
19/07/28 00:30:23 WARN TaskSetManager: Lost task 120.0 in stage 3.0 (TID 252, 10.103.97.185, executor 0): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 90.0 in stage 3.0 (TID 222, 10.103.97.227, executor 39): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 14.0 in stage 3.0 (TID 146, 10.103.99.178, executor 34): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 27.0 in stage 3.0 (TID 159, 10.103.97.227, executor 39): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 34.0 in stage 3.0 (TID 166, 10.103.96.104, executor 57): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 99.0 in stage 3.0 (TID 231, 10.103.99.217, executor 42): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 79.0 in stage 3.0 (TID 211, 10.103.101.4, executor 26): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 114.0 in stage 3.0 (TID 246, 10.103.98.157, executor 50): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 60.0 in stage 3.0 (TID 192, 10.103.100.32, executor 52): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 1.0 in stage 3.0 (TID 133, 10.103.100.102, executor 62): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 15.0 in stage 3.0 (TID 147, 10.103.100.37, executor 22): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 100.0 in stage 3.0 (TID 232, 10.103.98.231, executor 3): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 47.0 in stage 3.0 (TID 179, 10.103.100.32, executor 51): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 10.0 in stage 3.0 (TID 142, 10.103.100.95, executor 15): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 36.0 in stage 3.0 (TID 168, 10.103.99.217, executor 42): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 78.0 in stage 3.0 (TID 210, 10.103.100.37, executor 22): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 63.0 in stage 3.0 (TID 195, 10.103.96.26, executor 27): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 61.0 in stage 3.0 (TID 193, 10.103.98.157, executor 49): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 3.3 in stage 3.0 (TID 264, 10.103.101.77, executor 9): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 76.0 in stage 3.0 (TID 208, 10.103.98.186, executor 45): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 16.0 in stage 3.0 (TID 148, 10.103.101.4, executor 26): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 13.0 in stage 3.0 (TID 145, 10.103.98.186, executor 45): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 125.0 in stage 3.0 (TID 257, 10.103.96.26, executor 29): TaskKilled (Stage cancelled)
Traceback (most recent call last):
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 281, in <module>
    main(args)
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 261, in main
    process(dateToQuery, sc, field_pos)
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/summarize_requests_hourly_od_data.py", line 241, in process
    .saveToCassandra(opts.get('keyspace'), opts.get('dsttable'))
  File "/home/sparkuser/prod/scripts/request_summaries/scripts/python3/pyspark-cassandra-assembly-0.11.0.jar/pyspark_cassandra/rdd.py", line 99, in saveToCassandra
  File "/home/sparkuser/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/sparkuser/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
19/07/28 00:30:23 WARN TaskSetManager: Lost task 80.0 in stage 3.0 (TID 212, 10.103.97.252, executor 37): TaskKilled (Stage cancelled)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.saveToCassandra.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 52 in stage 3.0 failed 4 times, most recent failure: Lost task 52.3 in stage 3.0 (TID 265, 10.103.
100.102, executor 60): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
        at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
        at pyspark_cassandra.PythonHelper.saveToCassandra(PythonHelper.scala:94)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for datetime.datetime)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
        at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:233)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:210)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:183)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

19/07/28 00:30:23 WARN TaskSetManager: Lost task 113.0 in stage 3.0 (TID 245, 10.103.100.95, executor 17): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 124.0 in stage 3.0 (TID 256, 10.103.98.157, executor 49): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 123.0 in stage 3.0 (TID 255, 10.103.100.32, executor 52): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 110.0 in stage 3.0 (TID 242, 10.103.100.32, executor 51): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 89.0 in stage 3.0 (TID 221, 10.103.100.95, executor 16): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 83.0 in stage 3.0 (TID 215, 10.103.97.185, executor 1): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 103.0 in stage 3.0 (TID 235, 10.103.96.124, executor 56): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 73.0 in stage 3.0 (TID 205, 10.103.100.95, executor 15): TaskKilled (Stage cancelled)
19/07/28 00:30:23 WARN TaskSetManager: Lost task 50.0 in stage 3.0 (TID 182, 10.103.100.95, executor 17): TaskKilled (Stage cancelled)
19/07/28 00:30:23 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.

and then some more.

When I use .collect() just to print some debug information or counts, everything works perfect.

Thanks in advance for any guidance you can provide.

Can't insert date type into cassandra

This is my error when i try to insert m_date=datetime.date.today() or datetime.datetime.now().date()

Caused by: com.datastax.spark.connector.types.TypeConversionException: Cannot convert object java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Asia/Ho_Chi_Minh",offset=25200000,dstSavings=0,useDaylight=false,transitions=11,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2017,MONTH=11,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=11,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=?,ZONE_OFFSET=?,DST_OFFSET=?] of type class java.util.GregorianCalendar to com.datastax.driver.core.LocalDate.

cassandraTable("pramod","history","ROW")

Using, pyspark-cassandra-0.9.0.jar

I see below error, please help.

sc.cassandraTable("pramod","history","ROW").where("sent_date >='2024-06-11T00:00:00Z' and sent_date <='2027-11-15T00:00:00Z'").cassandraCount()
Traceback (most recent call last):
File "", line 1, in
File "/var/lib/spark/rdd/spark-ee8d74f8-6d9e-4283-9dba-957cd9d83d8c/userFiles-4e5d6454-e784-4d00-875a-e7c7acf3d566/pyspark-cassandra-0.9.0.jar/pyspark_cassandra/context.py", line 33, in cassandraTable
File "/var/lib/spark/rdd/spark-ee8d74f8-6d9e-4283-9dba-957cd9d83d8c/userFiles-4e5d6454-e784-4d00-875a-e7c7acf3d566/pyspark-cassandra-0.9.0.jar/pyspark_cassandra/rdd.py", line 318, in init
File "/var/lib/spark/rdd/spark-ee8d74f8-6d9e-4283-9dba-957cd9d83d8c/userFiles-4e5d6454-e784-4d00-875a-e7c7acf3d566/pyspark-cassandra-0.9.0.jar/pyspark_cassandra/rdd.py", line 192, in init
ValueError: invalid row_format ROW

Also,
Although I gave 6 arguments, its throwing error as 8 given.

sc.cassandraTable("pramod","history","ROW",100000,1000,"LOCAL_QUORUM").where("sent_date >='2024-06-11T00:00:00Z' and sent_date <='2027-11-15T00:00:00Z'").cassandraCount()
Traceback (most recent call last):
File "", line 1, in
File "/var/lib/spark/rdd/spark-ee8d74f8-6d9e-4283-9dba-957cd9d83d8c/userFiles-4e5d6454-e784-4d00-875a-e7c7acf3d566/pyspark-cassandra-0.9.0.jar/pyspark_cassandra/context.py", line 33, in cassandraTable
TypeError: init() takes at most 6 arguments (8 given)


spark-submit gives TypeError

bash-4.4# ./spark-submit --jars ./pyspark-cassandra-2.4.0.jar --py-files ./pyspark-cassandra-2.4.0.jar --conf spark.cassandra.connection.host=192.168.158.133 --conf spark.port.maxRetries=50 t.py

2020-01-20 09:54:07 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
File "/spark/bin/t.py", line 3, in
import pyspark_cassandra
File "/spark/bin/pyspark-cassandra-2.4.0.jar/pyspark_cassandra/init.py", line 54, in
File "/spark/bin/pyspark-cassandra-2.4.0.jar/pyspark_cassandra/context.py", line 21, in monkey_patch_sc
TypeError: class assignment: only for heap types
2020-01-20 09:54:08 INFO ShutdownHookManager:54 - Shutdown hook called
2020-01-20 09:54:08 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-246b1a89-fb6d-40eb-9de9-2b43b625bf49

pyspark shell is working but while submitting a job, it is throwing the above error

initial lines of t.py script

from pyspark import SparkContext as sc
from pyspark.sql import SQLContext

**import pyspark_cassandra**

sqlctx = SQLContext(sc)

Compatibility issues with Spark 2.3.0

Hi,
Great library! I run into this error while trying to save to cassandra:

ERROR Executor:91 - Exception in task 24.0 in stage 0.0 (TID 24)
java.io.IOException: Exception during preparation of SELECT "patient_id", "encounter_datetime", "date_created", "encounter_id", "encounter_type", "location_id", "obs", "visit_id" FROM "etl"."flat_obs" WHERE token("patient_id") > ? AND token("patient_id") <= ?   ALLOW FILTERING: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:323)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.com$datastax$spark$connector$rdd$CassandraTableScanRDD$$fetchTokenRange(CassandraTableScanRDD.scala:339)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$17.apply(CassandraTableScanRDD.scala:367)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.hasNext(InMemoryRelation.scala:139)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.datastax.spark.connector.types.TypeConverter$
	at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:50)
	at com.datastax.spark.connector.types.BigIntType$.converterToCassandra(PrimitiveColumnType.scala:46)
	at com.datastax.spark.connector.types.ColumnType$.converterToCassandra(ColumnType.scala:231)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$11.apply(CassandraTableScanRDD.scala:312)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD$$anonfun$11.apply(CassandraTableScanRDD.scala:312)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:312)
	... 38 more

Is this due to the connectors incompatibility with Spark 2.3.0?

Compatibility with Spark 2.4.5?

I am trying to use this package (last version 2.4.0 with Spark 2.4.5 but I get all the time the same error:

Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

Is not compatible yet with 2.4.5 version?

Import Pyspark-cassandra in spark workers failing.

Hi,
I am using a pyspark + Cassandra to work on an issue.
Setup Details:

  1. the spark is downloaded and kept in my D:\spark\spark
  2. The scala in D:\spark\scala
  3. Hadoop in D:\spark\hadoop
  4. Got the Cassandra and ran the Cassandra.bat file and it works fine.
  5. All the spark, scala and Hadoop paths are set in the environment variables and the updated the path variable with Cassandra bin and spark bin
  6. I downloaded pyspark_Cassandra spark packages and kept in D:\spark\spark\jars folder.
    Running the import pyspark_cassandra in the python env is throwing import_module error.
    How to make pyspark_cassandra available to the python environment globally.

Error:


Py4JJavaError Traceback (most recent call last)
in ()
----> 1 rdd.take(2)

D:\spark\spark\python\pyspark\rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res

D:\spark\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
990 # SparkContext#runJob.
991 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
994

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 4.0 failed 1 times, most recent failure: Lost task 9.0 in stage 4.0 (TID 55, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream
for obj in iterator:
File "D:\spark\spark\python\pyspark\rdd.py", line 1752, in add_shuffle_key
for k, v in iterator:
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 144, in load_stream
yield self._read_with_length(stream)
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 451, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pyspark_cassandra'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 138, in dump_stream
for obj in iterator:
File "D:\spark\spark\python\pyspark\rdd.py", line 1752, in add_shuffle_key
for k, v in iterator:
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 144, in load_stream
yield self._read_with_length(stream)
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "D:\spark\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 451, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pyspark_cassandra'

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:395)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more

import not working

Cannot import pyspark_cassandra

> pyspark --packages anguenot/pyspark-cassandra:0.7.0 --conf spark.cassandra.connection.host=12.34.56.78
SPARK_MAJOR_VERSION is set to 2, using Spark2
Python 2.7.12 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
Ivy Default Cache set to: /home/opnf/.ivy2/cache
The jars for the packages stored in: /home/opnf/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.5.5.0-157/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
anguenot#pyspark-cassandra added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
        confs: [default]
        found anguenot#pyspark-cassandra;0.7.0 in spark-packages
        found com.datastax.spark#spark-cassandra-connector_2.11;2.0.6 in central
        found org.joda#joda-convert;1.2 in central
        found commons-beanutils#commons-beanutils;1.9.3 in central
        found commons-collections#commons-collections;3.2.2 in central
        found com.twitter#jsr166e;1.1.0 in central
        found io.netty#netty-all;4.0.33.Final in central
        found joda-time#joda-time;2.3 in central
        found org.scala-lang#scala-reflect;2.11.8 in central
        found net.razorvine#pyrolite;4.10 in central
        found net.razorvine#serpent;1.12 in central
:: resolution report :: resolve 710ms :: artifacts dl 33ms
        :: modules in use:
        anguenot#pyspark-cassandra;0.7.0 from spark-packages in [default]
        com.datastax.spark#spark-cassandra-connector_2.11;2.0.6 from central in [default]
        com.twitter#jsr166e;1.1.0 from central in [default]
        commons-beanutils#commons-beanutils;1.9.3 from central in [default]
        commons-collections#commons-collections;3.2.2 from central in [default]
        io.netty#netty-all;4.0.33.Final from central in [default]
        joda-time#joda-time;2.3 from central in [default]
        net.razorvine#pyrolite;4.10 from central in [default]
        net.razorvine#serpent;1.12 from central in [default]
        org.joda#joda-convert;1.2 from central in [default]
        org.scala-lang#scala-reflect;2.11.8 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   11  |   0   |   0   |   0   ||   11  |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
        confs: [default]
        0 artifacts copied, 11 already retrieved (0kB/40ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/anguenot_pyspark-cassandra-0.7.0.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/com.datastax.spark_spark-cassandra-connector_2.11-2.0.6.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/net.razorvine_pyrolite-4.10.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/org.joda_joda-convert-1.2.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/commons-beanutils_commons-beanutils-1.9.3.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/com.twitter_jsr166e-1.1.0.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/io.netty_netty-all-4.0.33.Final.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/joda-time_joda-time-2.3.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/commons-collections_commons-collections-3.2.2.jar added multiple times to distributed cache.
18/04/17 14:52:39 WARN Client: Same path resource file:/home/opnf/.ivy2/jars/net.razorvine_serpent-1.12.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.2.2.5.5.0-157
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkSession available as 'spark'.
>>> import pyspark_cassandra
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ImportError: No module named pyspark_cassandra

env is Azure datalake with spark 2 installed.

WRITETIME access at cell level?

I've been trying to find a way to access WRITETIME from cassandra (field) into pyspark. Am I looking at the right place? I can't seem to find a way to do this...
Thanks

Unable to read empty collections

Steps to reproduce issue:

  1. create table with frozen collection column
    CREATE TABLE test_frozen_collection (
        key int,
        collection frozen<list<text>>,
        PRIMARY KEY (key)
    );
    
  2. insert row with empty collection
    INSERT INTO test_frozen_collection (key, collection) VALUES (1, []);
    
  3. attempt to read row in pyspark
    import pyspark_cassandra
    pyspark_cassandra.monkey_patch_sc(sc)
    rdd = sc.cassandraTable('test_keyspace', 'test_frozen_collection')
    rdd.where('key=?', 1).collect()
    
  4. error:
    net.razorvine.pickle.PickleException: couldn't introspect javabean: java.lang.IllegalArgumentException: wrong number of arguments

Note, if the collection column contains a null value, pyspark is happy

INSERT INTO test_frozen_collection (key, collection) VALUES (1, null);

Facing issue running as flat jar [py4j.protocol.Py4JJavaError: An error occurred while calling o38.loadClass. : java.lang.ClassNotFoundException: pyspark_cassandra.PythonHelper]

I am facing this issue while running a pyspark (cassandra flat jar version 0.7.0) program for spark-2.0.1 (scala 2.11) and python 2.7.
Following issue is occuring while trying to save cassandra (table as mentioned in the example):
Also tried this command : bin/spark-submit --jars ~/software/spark-cassandra-connector_2.10-2.0.1.jar --driver-class-path ~/software/spark-cassandra-connector_2.10-2.0.1.jar --py-files ~/software/pyspark-cassandra-0.7.0.jar --conf spark.cassandra.connection.host=127 --master spark://127.0.0.1:7077 ~/pipeline/CassandraModel.py

File "/home/phenodoop/pipeline/CassandraModel.py", line 20, in
rdd.saveToCassandra("moviedb", "movies", ('id','title','year'))
File "/home/phenodoop/software/pyspark-cassandra-0.7.0.jar/pyspark_cassandra/rdd.py", line 85, in saveToCassandra
File "/home/phenodoop/software/pyspark-cassandra-0.7.0.jar/pyspark_cassandra/util.py", line 99, in helper
File "/home/phenodoop/software/pyspark-cassandra-0.7.0.jar/pyspark_cassandra/util.py", line 88, in load_class
File "/home/phenodoop/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in call
File "/home/phenodoop/spark-2.0.1/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o38.loadClass.
: java.lang.ClassNotFoundException: pyspark_cassandra.PythonHelper
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

saveToCassandra Failed.

Trying to save an rdd to Cassandra is failing with pyspark_cassandra. It worked with df.write
.format("org.apache.spark.sql.cassandra")
.mode('append')
.options(table="train", keyspace="kaggle")
.save()

Py4JJavaError Traceback (most recent call last)
in ()
----> 1 df.rdd.saveToCassandra("kaggle", "train")

~\AppData\Local\Continuum\anaconda3\lib\site-packages\pyspark_cassandra\rdd.py in saveToCassandra(rdd, keyspace, table, columns, row_format, keyed, write_conf, **write_conf_kwargs)
91 row_format,
92 keyed,
---> 93 write_conf,
94 )
95

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:

c:\spark\spark\python\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

~\AppData\Local\Continuum\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 raise Py4JJavaError(
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
322 raise Py4JError(

Py4JJavaError: An error occurred while calling o57.saveToCassandra.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 11.0 failed 1 times, most recent failure: Lost task 0.0 in stage 11.0 (TID 23, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._parse_datatype_json_string)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at pyspark_cassandra.PythonHelper.saveToCassandra(PythonHelper.scala:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._parse_datatype_json_string)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
at pyspark_util.BatchUnpickler.apply(Pickling.scala:139)
at pyspark_util.BatchUnpickler.apply(Pickling.scala:137)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 more

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.