In [1]: import os
In [2]: os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"
In [3]: from prefect.executors import DaskExecutor
In [4]: from prefect import Flow, task
In [5]: from funnel import SQLMetadataStore, CacheStore
In [6]: from funnel.prefect.result import FunnelResult
In [8]: r = FunnelResult(SQLMetadataStore(CacheStore("/tmp/test"), serializer='xarray.zarr'))
In [9]: @task(target="bar.zarr", result=r)
...: def xarray_task():
...: time.sleep(10)
...: ds = xr.tutorial.open_dataset('rasm').isel(time=0)
...: return ds
...:
In [11]: with Flow("hello") as flow:
...: xarray_task()
...:
In [13]: flow.run(executor = DaskExecutor(cluster_kwargs={"processes": False, "threads_per_worker": 8}, debu
...: g=True))
[2021-11-18 14:10:50-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'hello'
[2021-11-18 14:10:50-0700] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
[2021-11-18 14:10:50-0700] INFO - prefect.DaskExecutor | The Dask dashboard is available at http://10.0.0.209:8787/status
distributed.comm.inproc - WARNING - Closing dangling queue in <InProc local=inproc://10.0.0.209/18039/8 remote=inproc://10.0.0.209/18039/1>
[2021-11-18 14:10:50-0700] ERROR - prefect.FlowRunner | Unexpected error: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")
Traceback (most recent call last):
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function xarray_task at 0x10c4a5550>: it's not the same object as __main__.xarray_task
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state
task_states[task] = executor.submit(
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/executors/dask.py", line 421, in submit
fut = self.client.submit(
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/client.py", line 1576, in submit
futures = self._graph_to_futures(
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/client.py", line 2627, in _graph_to_futures
dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1076, in __dask_distributed_pack__
"state": layer.__dask_distributed_pack__(
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/dask/highlevelgraph.py", line 434, in __dask_distributed_pack__
dsk = toolz.valmap(dumps_task, dsk)
File "cytoolz/dicttoolz.pyx", line 178, in cytoolz.dicttoolz.valmap
File "cytoolz/dicttoolz.pyx", line 203, in cytoolz.dicttoolz.valmap
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/worker.py", line 4138, in dumps_task
d["kwargs"] = warn_dumps(task[3])
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/worker.py", line 4150, in warn_dumps
b = dumps(obj, protocol=4)
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
[2021-11-18 14:10:50-0700] ERROR - prefect.hello | Unexpected error occured in FlowRunner: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")
Out[13]: <Failed: "Unexpected error: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")">