Code Monkey home page Code Monkey logo

pyspark_dl_pipeline's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

pyspark_dl_pipeline's Issues

Error while running the new deep learning pipeline and helper function on both data sets

What should I do here to fix the error?

RuntimeError                              Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_6652/107061413.py in <module>
----> 1 dl_pipeline_fit_score_results(dl_pipeline=dl_pipeline,
      2                               train_data=train_data,
      3                               test_data=test_data,
      4                               label='label_index');

~\AppData\Local\Temp/ipykernel_6652/3033932891.py in dl_pipeline_fit_score_results(dl_pipeline, train_data, test_data, label)
      4                                   label='label_index'):
      5 
----> 6     fit_dl_pipeline = dl_pipeline.fit(train_data)
      7     pred_train = fit_dl_pipeline.transform(train_data)
      8     pred_test = fit_dl_pipeline.transform(test_data)

~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\base.py in fit(self, dataset, params)
    159                 return self.copy(params)._fit(dataset)
    160             else:
--> 161                 return self._fit(dataset)
    162         else:
    163             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\pipeline.py in _fit(self, dataset)
    112                     dataset = stage.transform(dataset)
    113                 else:  # must be an Estimator
--> 114                     model = stage.fit(dataset)
    115                     transformers.append(model)
    116                     if i < indexOfLastEstimator:

~\anaconda3\envs\sama\lib\site-packages\pyspark\ml\base.py in fit(self, dataset, params)
    159                 return self.copy(params)._fit(dataset)
    160             else:
--> 161                 return self._fit(dataset)
    162         else:
    163             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~\anaconda3\envs\sama\lib\site-packages\elephas\ml_model.py in _fit(self, df)
     84                                       features_col=self.getFeaturesCol(), label_col=self.getLabelCol())
     85         simple_rdd = simple_rdd.repartition(self.get_num_workers())
---> 86         keras_model = model_from_yaml(self.get_keras_model_config(), self.get_custom_objects())
     87         metrics = self.get_metrics()
     88         loss = self.get_loss()

~\anaconda3\envs\sama\lib\site-packages\tensorflow\python\keras\saving\model_config.py in model_from_yaml(yaml_string, custom_objects)
     76       RuntimeError: announces that the method poses a security risk
     77   """
---> 78   raise RuntimeError(
     79       'Method `model_from_yaml()` has been removed due to security risk of '
     80       'arbitrary code execution. Please use `Model.to_json()` and '

RuntimeError: Method `model_from_yaml()` has been removed due to security risk of arbitrary code execution. Please use `Model.to_json()` and `model_from_json()` instead.

โ€‹

cPickle.PicklingError: Could not serialize object: NotImplementedError

Getting error running this example without modifications:

pyspark_1      | Traceback (most recent call last):
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/serializers.py", line 590, in dumps
pyspark_1      |     return cloudpickle.dumps(obj, 2)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 863, in dumps
pyspark_1      |     cp.dump(obj)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 260, in dump
pyspark_1      |     return Pickler.dump(self, obj)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 224, in dump
pyspark_1      |     self.save(obj)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 568, in save_tuple
pyspark_1      |     save(element)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 406, in save_function
pyspark_1      |     self.save_function_tuple(obj)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
pyspark_1      |     save(state)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1      |     self._batch_setitems(obj.iteritems())
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1      |     save(v)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 606, in save_list
pyspark_1      |     self._batch_appends(iter(obj))
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
pyspark_1      |     save(tmp[0])
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/cloudpickle.py", line 660, in save_instancemethod
pyspark_1      |     obj=obj)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 401, in save_reduce
pyspark_1      |     save(args)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 554, in save_tuple
pyspark_1      |     save(element)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 331, in save
pyspark_1      |     self.save_reduce(obj=obj, *rv)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
pyspark_1      |     save(state)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1      |     self._batch_setitems(obj.iteritems())
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1      |     save(v)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 606, in save_list
pyspark_1      |     self._batch_appends(iter(obj))
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 642, in _batch_appends
pyspark_1      |     save(tmp[0])
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 331, in save
pyspark_1      |     self.save_reduce(obj=obj, *rv)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 425, in save_reduce
pyspark_1      |     save(state)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 286, in save
pyspark_1      |     f(self, obj) # Call unbound method with explicit self
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 655, in save_dict
pyspark_1      |     self._batch_setitems(obj.iteritems())
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 687, in _batch_setitems
pyspark_1      |     save(v)
pyspark_1      |   File "/usr/lib/python2.7/pickle.py", line 306, in save
pyspark_1      |     rv = reduce(self.proto)
pyspark_1      |   File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 1152, in __reduce__
pyspark_1      |     initial_value=self.numpy(),
pyspark_1      |   File "/usr/local/lib/python2.7/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 906, in numpy
pyspark_1      |     "numpy() is only available when eager execution is enabled.")
pyspark_1      | NotImplementedError: numpy() is only available when eager execution is enabled.
pyspark_1      | Traceback (most recent call last):
pyspark_1      |   File "/home/ubuntu/./spark.py", line 262, in <module>
pyspark_1      |     label='label_index')
pyspark_1      |   File "/home/ubuntu/./spark.py", line 237, in dl_pipeline_fit_score_results
pyspark_1      |     fit_dl_pipeline = dl_pipeline.fit(train_data)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/base.py", line 132, in fit
pyspark_1      |     return self._fit(dataset)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/pipeline.py", line 109, in _fit
pyspark_1      |     model = stage.fit(dataset)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/ml/base.py", line 132, in fit
pyspark_1      |     return self._fit(dataset)
pyspark_1      |   File "/usr/local/lib/python2.7/dist-packages/elephas/ml_model.py", line 92, in _fit
pyspark_1      |     validation_split=self.get_validation_split())
pyspark_1      |   File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 151, in fit
pyspark_1      |     self._fit(rdd, epochs, batch_size, verbose, validation_split)
pyspark_1      |   File "/usr/local/lib/python2.7/dist-packages/elephas/spark_model.py", line 188, in _fit
pyspark_1      |     gradients = rdd.mapPartitions(worker.train).collect()
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 816, in collect
pyspark_1      |     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2532, in _jrdd
pyspark_1      |     self._jrdd_deserializer, profiler)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2434, in _wrap_function
pyspark_1      |     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2420, in _prepare_for_python_RDD
pyspark_1      |     pickled_command = ser.dumps(command)
pyspark_1      |   File "/home/ubuntu/spark-2.4.4-bin-hadoop2.7/python/pyspark/serializers.py", line 600, in dumps
pyspark_1      |     raise pickle.PicklingError(msg)
pyspark_1      | cPickle.PicklingError: Could not serialize object: NotImplementedError: numpy() is only available when eager execution is enabled.

ValueError: Could not interpret optimizer identifier: False

Hi,
On running the final cell, i.e. the method dl_pipeline_fit_score_results(), I am getting the following error

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-54-cb5b6595cf8f> in <module>
      2                               train_data=train_data,
      3                               test_data=test_data,
----> 4                               label='label_index');

<ipython-input-53-1c2301ef586e> in dl_pipeline_fit_score_results(dl_pipeline, train_data, test_data, label)
      4                                   label='label_index'):
      5 
----> 6     fit_dl_pipeline = dl_pipeline.fit(train_data)
      7     pred_train = fit_dl_pipeline.transform(train_data)
      8     pred_test = fit_dl_pipeline.transform(test_data)

/opt/conda/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/conda/lib/python3.6/site-packages/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/opt/conda/lib/python3.6/site-packages/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/conda/lib/python3.6/site-packages/elephas/ml_model.py in _fit(self, df)
     90                         batch_size=self.get_batch_size(),
     91                         verbose=self.get_verbosity(),
---> 92                         validation_split=self.get_validation_split())
     93 
     94         model_weights = spark_model.master_network.get_weights()

/opt/conda/lib/python3.6/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
    149 
    150         if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 151             self._fit(rdd, epochs, batch_size, verbose, validation_split)
    152         else:
    153             raise ValueError(

/opt/conda/lib/python3.6/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
    159         self._master_network.compile(optimizer=self.master_optimizer,
    160                                      loss=self.master_loss,
--> 161                                      metrics=self.master_metrics)
    162         if self.mode in ['asynchronous', 'hogwild']:
    163             self.start_server()

/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in compile(self, optimizer, loss, metrics, loss_weights, weighted_metrics, run_eagerly, **kwargs)
    539       self._run_eagerly = run_eagerly
    540 
--> 541       self.optimizer = self._get_optimizer(optimizer)
    542       self.compiled_loss = compile_utils.LossesContainer(
    543           loss, loss_weights, output_names=self.output_names)

/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in _get_optimizer(self, optimizer)
    565       return opt
    566 
--> 567     return nest.map_structure(_get_single_optimizer, optimizer)
    568 
    569   @trackable.no_automatic_dependency_tracking

/opt/conda/lib/python3.6/site-packages/tensorflow/python/util/nest.py in map_structure(func, *structure, **kwargs)
    633 
    634   return pack_sequence_as(
--> 635       structure[0], [func(*x) for x in entries],
    636       expand_composites=expand_composites)
    637 

/opt/conda/lib/python3.6/site-packages/tensorflow/python/util/nest.py in <listcomp>(.0)
    633 
    634   return pack_sequence_as(
--> 635       structure[0], [func(*x) for x in entries],
    636       expand_composites=expand_composites)
    637 

/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/engine/training.py in _get_single_optimizer(opt)
    559 
    560     def _get_single_optimizer(opt):
--> 561       opt = optimizers.get(opt)
    562       if (self._dtype_policy.loss_scale is not None and
    563           not isinstance(opt, lso.LossScaleOptimizer)):

/opt/conda/lib/python3.6/site-packages/tensorflow/python/keras/optimizers.py in get(identifier)
    901   else:
    902     raise ValueError(
--> 903         'Could not interpret optimizer identifier: {}'.format(identifier))

ValueError: Could not interpret optimizer identifier: False

PicklingError: Could not serialize object: ValueError: substring not found

While fitting traind_data, getting PicklingError, can you help on this, Thanks

fit_dl_pipeline = dl_pipeline.fit(train_data)
/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
    785         for element in obj:
--> 786             save(element)
    787 

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/databricks/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
    405             if klass is None or klass is not obj:
--> 406                 self.save_function_tuple(obj)
    407                 return

/databricks/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
    548             state['qualname'] = func.__qualname__
--> 549         save(state)
    550         write(pickle.TUPLE)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    842             elif n:
--> 843                 save(tmp[0])
    844                 write(APPEND)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/databricks/spark/python/pyspark/cloudpickle.py in save_instancemethod(self, obj)
    656             if PY3:
--> 657                 self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
    658             else:

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    637             save(func)
--> 638             save(args)
    639             write(REDUCE)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_tuple(self, obj)
    770             for element in obj:
--> 771                 save(element)
    772             # Subtle.  Same as in the big comment below.

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    842             elif n:
--> 843                 save(tmp[0])
    844                 write(APPEND)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    548         # Save the reduce() output and finally memoize the object
--> 549         self.save_reduce(obj=obj, *rv)
    550 

/usr/lib/python3.7/pickle.py in save_reduce(self, func, args, state, listitems, dictitems, obj)
    661         if state is not None:
--> 662             save(state)
    663             write(BUILD)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_dict(self, obj)
    855         self.memoize(obj)
--> 856         self._batch_setitems(obj.items())
    857 

/usr/lib/python3.7/pickle.py in _batch_setitems(self, items)
    881                     save(k)
--> 882                     save(v)
    883                 write(SETITEMS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return

/usr/lib/python3.7/pickle.py in save_list(self, obj)
    815         self.memoize(obj)
--> 816         self._batch_appends(obj)
    817 

/usr/lib/python3.7/pickle.py in _batch_appends(self, items)
    839                 for x in tmp:
--> 840                     save(x)
    841                 write(APPENDS)

/usr/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py in __reduce__(self)
    831         trainable=self.trainable,
--> 832         name=self._shared_name,
    833         dtype=self.dtype,

/databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/variables.py in _shared_name(self)
   1149     """
-> 1150     return self.name[:self.name.index(":")]
   1151 

ValueError: substring not found

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
<command-2475319972266768> in <module>
----> 1 fit_dl_pipeline = dl_pipeline.fit(train_data)
      2 pred_train = fit_dl_pipeline.transform(train_data)
      3 pred_test = fit_dl_pipeline.transform(test_data)

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/databricks/python/lib/python3.7/site-packages/elephas/ml_model.py in _fit(self, df)
     90                         batch_size=self.get_batch_size(),
     91                         verbose=self.get_verbosity(),
---> 92                         validation_split=self.get_validation_split())
     93 
     94         model_weights = spark_model.master_network.get_weights()

/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in fit(self, rdd, epochs, batch_size, verbose, validation_split)
    149 
    150         if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 151             self._fit(rdd, epochs, batch_size, verbose, validation_split)
    152         else:
    153             raise ValueError(

/databricks/python/lib/python3.7/site-packages/elephas/spark_model.py in _fit(self, rdd, epochs, batch_size, verbose, validation_split)
    186             worker = SparkWorker(yaml, parameters, train_config,
    187                                  optimizer, loss, metrics, custom)
--> 188             gradients = rdd.mapPartitions(worker.train).collect()
    189             new_parameters = self._master_network.get_weights()
    190             for grad in gradients:  # simply accumulate gradients one by one

/databricks/spark/python/pyspark/rdd.py in collect(self)
    829         # Default path used in OSS Spark / for non-credential passthrough clusters:
    830         with SCCallSiteSync(self.context) as css:
--> 831             sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    832         return list(_load_from_socket(sock_info, self._jrdd_deserializer))
    833 

/databricks/spark/python/pyspark/rdd.py in _jrdd(self)
   2573 
   2574         wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
-> 2575                                       self._jrdd_deserializer, profiler)
   2576         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
   2577                                              self.preservesPartitioning, self.is_barrier)

/databricks/spark/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2475     assert serializer, "serializer should not be empty"
   2476     command = (func, profiler, deserializer, serializer)
-> 2477     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2478     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2479                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2461     # the serialized command will be compressed by broadcast
   2462     ser = CloudPickleSerializer()
-> 2463     pickled_command = ser.dumps(command)
   2464     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   2465         # The broadcast will have same life cycle as created PythonRDD

/databricks/spark/python/pyspark/serializers.py in dumps(self, obj)
    713                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
    714             cloudpickle.print_exec(sys.stderr)
--> 715             raise pickle.PicklingError(msg)
    716 
    717 

PicklingError: Could not serialize object: ValueError: substring not found

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.