pyspark_dl_pipeline's People
Forkers
hsharmatech bmrmartins abanerji stjordanis naswiz deeplakkad92 cristm yanjiasun pflashgary shineloveyc jormungand-a naveedafzal ondiec davidhhss tarunsingh272 pushkarsinha elhmadany winsc1ence fbranda tanay0nspark henintsoaraza pravinkr atkachivska dustyymelody7 asif-ejaz-blt john2408 lakshmiya paolocristini82 gilbertcarey zsoftwarerepository ahmedxomar101 rexche jlveramendipyspark_dl_pipeline's Issues
Error while running the new deep learning pipeline and helper function on both data sets
What should I do here to fix the error?
RuntimeError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_6652/107061413.py in <module>
----> 1 dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
2 train_data=train_data,
3 test_data=test_data,
4 label='label_index');
~\AppData\Local\Temp/ipykernel_6652/3033932891.py in dl_pipeline_fit_score_results(dl_pipeline, train_data, test_data, label)
4 label='label_index'):
5
----> 6 fit_dl_pipeline = dl_pipeline.fit(train_data)
7 pred_train = fit_dl_pipeline.transform(train_data)
8 pred_test = fit_dl_pipeline.transform(test_data)
~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\base.py in fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\pipeline.py in _fit(self, dataset)
112 dataset = stage.transform(dataset)
113 else: # must be an Estimator
--> 114 model = stage.fit(dataset)
115 transformers.append(model)
116 if i < indexOfLastEstimator:
~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\base.py in fit(self, dataset, params)
159 return self.copy(params)._fit(dataset)
160 else:
--> 161 return self._fit(dataset)
162 else:
163 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
~\anaconda3\envs\sama\lib\site-packages\elephas\ml_model.py in _fit(self, df)
84 features_col=self.getFeaturesCol(), label_col=self.getLabelCol())
85 simple_rdd = simple_rdd.repartition(self.get_num_workers())
---> 86 keras_model = model_from_yaml(self.get_keras_model_config(), self.get_custom_objects())
87 metrics = self.get_metrics()
88 loss = self.get_loss()
~\anaconda3\envs\sama\lib\site-packages\tensorflow\python\keras\saving\model_config.py in model_from_yaml(yaml_string, custom_objects)
76 RuntimeError: announces that the method poses a security risk
77 """
---> 78 raise RuntimeError(
79 'Method `model_from_yaml()` has been removed due to security risk of '
80 'arbitrary code execution. Please use `Model.to_json()` and '
RuntimeError: Method `model_from_yaml()` has been removed due to security risk of arbitrary code execution. Please use `Model.to_json()` and `model_from_json()` instead.
โ
cPickle.PicklingError: Could not serialize object: NotImplementedError
Getting error running this example without modifications:
pyspark_1 | Traceback (most recent call last):
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/serializers.py", line 590, in dumps
pyspark_1 | return cloudpickle.dumps(obj, 2)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 863, in dumps
pyspark_1 | cp.dump(obj)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 260, in dump
pyspark_1 | return Pickler.dump(self, obj)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 224, in dump
pyspark_1 | self.save(obj)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
pyspark_1 | save(element)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 406, in save_function
pyspark_1 | self.save_function_tuple(obj)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
pyspark_1 | save(state)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1 | self._batch_setitems(obj.iteritems())
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1 | save(v)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 606, in save_list
pyspark_1 | self._batch_appends(iter(obj))
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
pyspark_1 | save(tmp[0])
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 660, in save_instancemethod
pyspark_1 | obj=obj)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 401, in save_reduce
pyspark_1 | save(args)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
pyspark_1 | save(element)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 331, in save
pyspark_1 | self.save_reduce(obj=obj, *rv)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
pyspark_1 | save(state)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1 | self._batch_setitems(obj.iteritems())
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1 | save(v)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 606, in save_list
pyspark_1 | self._batch_appends(iter(obj))
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
pyspark_1 | save(tmp[0])
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 331, in save
pyspark_1 | self.save_reduce(obj=obj, *rv)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
pyspark_1 | save(state)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1 | f(self, obj) # Call unbound method with explicit self
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1 | self._batch_setitems(obj.iteritems())
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1 | save(v)
pyspark_1 | File "/usr/lib/python2.7/pickle.py", line 306, in save
pyspark_1 | rv = reduce(self.proto)
pyspark_1 | File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 1152, in __reduce__
pyspark_1 | initial_value=self.numpy(),
pyspark_1 | File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 906, in numpy
pyspark_1 | "numpy() is only available when eager execution is enabled.")
pyspark_1 | NotImplementedError: numpy() is only available when eager execution is enabled.
pyspark_1 | Traceback (most recent call last):
pyspark_1 | File "/home/ubuntu/./spark.py", line 262, in <module>
pyspark_1 | label='label_index')
pyspark_1 | File "/home/ubuntu/./spark.py", line 237, in dl_pipeline_fit_score_results
pyspark_1 | fit_dl_pipeline = dl_pipeline.fit(train_data)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/base.py", line 132, in fit
pyspark_1 | return self._fit(dataset)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 109, in _fit
pyspark_1 | model = stage.fit(dataset)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/base.py", line 132, in fit
pyspark_1 | return self._fit(dataset)
pyspark_1 | File "/usr/local/lib/python2.7/dist-packages/elephas/ml_model.py", line 92, in _fit
pyspark_1 | validation_split=self.get_validation_split())
pyspark_1 | File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 151, in fit
pyspark_1 | self._fit(rdd, epochs, batch_size, verbose, validation_split)
pyspark_1 | File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 188, in _fit
pyspark_1 | gradients = rdd.mapPartitions(worker.train).collect()
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 816, in collect
pyspark_1 | sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2532, in _jrdd
pyspark_1 | self._jrdd_deserializer, profiler)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2434, in _wrap_function
pyspark_1 | pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pyspark_1 | pickled_command = ser.dumps(command)
pyspark_1 | File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/serializers.py", line 600, in dumps
pyspark_1 | raise pickle.PicklingError(msg)
pyspark_1 | cPickle.PicklingError: Could not serialize object: NotImplementedError: numpy() is only available when eager execution is enabled.
ValueError: Could not interpret optimizer identifier: False
Hi,
On running the final cell, i.e. the method dl_pipeline_fit_score_results(), I am getting the following error
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-54-cb5b6595cf8f> in <module>
2 train_data=train_data,
3 test_data=test_data,
----> 4 label='label_index');
<ipython-input-53-1c2301ef586e> in dl_pipeline_fit_score_results(dl_pipeline, train_data, test_data, label)
4 label='label_index'):
5
----> 6 fit_dl_pipeline = dl_pipeline.fit(train_data)
7 pred_train = fit_dl_pipeline.transform(train_data)
8 pred_test = fit_dl_pipeline.transform(test_data)
/opt/conda/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/conda/lib/python3.6/site-packages/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/opt/conda/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/conda/lib/python3.6/site-packages/elephas/ml_model.py in _fit(self, df)
90 batch_size=self.get_batch_size(),
91 verbose=self.get_verbosity(),
---> 92 validation_split=self.get_validation_split())
93
94 model_weights = spark_model.master_network.get_weights()
/opt/conda/lib/python3.6/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
149
150 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 151 self._fit(rdd, epochs, batch_size, verbose, validation_split)
152 else:
153 raise ValueError(
/opt/conda/lib/python3.6/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
159 self._master_network.compile(optimizer=self.master_optimizer,
160 loss=self.master_loss,
--> 161 metrics=self.master_metrics)
162 if self.mode in ['asynchronous', 'hogwild']:
163 self.start_server()
/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in compile(self, optimizer, loss, metrics, loss_weights, weighted_metrics, run_eagerly, **kwargs)
539 self._run_eagerly = run_eagerly
540
--> 541 self.optimizer = self._get_optimizer(optimizer)
542 self.compiled_loss = compile_utils.LossesContainer(
543 loss, loss_weights, output_names=self.output_names)
/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in _get_optimizer(self, optimizer)
565 return opt
566
--> 567 return nest.map_structure(_get_single_optimizer, optimizer)
568
569 @trackable.no_automatic_dependency_tracking
/opt/conda/lib/python3.6/site-packages/tensorflow/python/util/nest.py in map_structure(func, *structure, **kwargs)
633
634 return pack_sequence_as(
--> 635 structure[0], [func(*x) for x in entries],
636 expand_composites=expand_composites)
637
/opt/conda/lib/python3.6/site-packages/tensorflow/python/util/nest.py in <listcomp>(.0)
633
634 return pack_sequence_as(
--> 635 structure[0], [func(*x) for x in entries],
636 expand_composites=expand_composites)
637
/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in _get_single_optimizer(opt)
559
560 def _get_single_optimizer(opt):
--> 561 opt = optimizers.get(opt)
562 if (self._dtype_policy.loss_scale is not None and
563 not isinstance(opt, lso.LossScaleOptimizer)):
/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/optimizers.py in get(identifier)
901 else:
902 raise ValueError(
--> 903 'Could not interpret optimizer identifier: {}'.format(identifier))
ValueError: Could not interpret optimizer identifier: False
PicklingError: Could not serialize object: ValueError: substring not found
While fitting traind_data, getting PicklingError, can you help on this, Thanks
fit_dl_pipeline = dl_pipeline.fit(train_data)
/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
785 for element in obj:
--> 786 save(element)
787
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/databricks/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
405 if klass is None or klass is not obj:
--> 406 self.save_function_tuple(obj)
407 return
/databricks/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
548 state['qualname'] = func.__qualname__
--> 549 save(state)
550 write(pickle.TUPLE)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_dict(self, obj)
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_list(self, obj)
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
842 elif n:
--> 843 save(tmp[0])
844 write(APPEND)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/databricks/spark/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
656 if PY3:
--> 657 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
658 else:
/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
637 save(func)
--> 638 save(args)
639 write(REDUCE)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
770 for element in obj:
--> 771 save(element)
772 # Subtle. Same as in the big comment below.
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_dict(self, obj)
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_list(self, obj)
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
842 elif n:
--> 843 save(tmp[0])
844 write(APPEND)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
548 # Save the reduce() output and finally memoize the object
--> 549 self.save_reduce(obj=obj, *rv)
550
/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
661 if state is not None:
--> 662 save(state)
663 write(BUILD)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_dict(self, obj)
855 self.memoize(obj)
--> 856 self._batch_setitems(obj.items())
857
/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
881 save(k)
--> 882 save(v)
883 write(SETITEMS)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
/usr/lib/python3.7/pickle.py in save_list(self, obj)
815 self.memoize(obj)
--> 816 self._batch_appends(obj)
817
/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
839 for x in tmp:
--> 840 save(x)
841 write(APPENDS)
/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py in __reduce__(self)
831 trainable=self.trainable,
--> 832 name=self._shared_name,
833 dtype=self.dtype,
/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/variables.py in _shared_name(self)
1149 """
-> 1150 return self.name[:self.name.index(":")]
1151
ValueError: substring not found
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
<command-2475319972266768> in <module>
----> 1 fit_dl_pipeline = dl_pipeline.fit(train_data)
2 pred_train = fit_dl_pipeline.transform(train_data)
3 pred_test = fit_dl_pipeline.transform(test_data)
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/python/lib/python3.7/site-packages/elephas/ml_model.py in _fit(self, df)
90 batch_size=self.get_batch_size(),
91 verbose=self.get_verbosity(),
---> 92 validation_split=self.get_validation_split())
93
94 model_weights = spark_model.master_network.get_weights()
/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
149
150 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 151 self._fit(rdd, epochs, batch_size, verbose, validation_split)
152 else:
153 raise ValueError(
/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
186 worker = SparkWorker(yaml, parameters, train_config,
187 optimizer, loss, metrics, custom)
--> 188 gradients = rdd.mapPartitions(worker.train).collect()
189 new_parameters = self._master_network.get_weights()
190 for grad in gradients: # simply accumulate gradients one by one
/databricks/spark/python/pyspark/rdd.py in collect(self)
829 # Default path used in OSS Spark / for non-credential passthrough clusters:
830 with SCCallSiteSync(self.context) as css:
--> 831 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
832 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
833
/databricks/spark/python/pyspark/rdd.py in _jrdd(self)
2573
2574 wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2575 self._jrdd_deserializer, profiler)
2576 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
2577 self.preservesPartitioning, self.is_barrier)
/databricks/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2475 assert serializer, "serializer should not be empty"
2476 command = (func, profiler, deserializer, serializer)
-> 2477 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2478 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2479 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2461 # the serialized command will be compressed by broadcast
2462 ser = CloudPickleSerializer()
-> 2463 pickled_command = ser.dumps(command)
2464 if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
2465 # The broadcast will have same life cycle as created PythonRDD
/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
713 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
714 cloudpickle.print_exec(sys.stderr)
--> 715 raise pickle.PicklingError(msg)
716
717
PicklingError: Could not serialize object: ValueError: substring not found
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ๐๐๐
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google โค๏ธ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.