Code Monkey home page Code Monkey logo

esds-funnel's Introduction

esds-funnel

CI GitHub Workflow Status Code Coverage Status
Docs Documentation Status
Package Conda PyPI
License License

Overview

esds-funnel is a Python package, which aims to facilitate synthesis and analysis of Earth system data.

Installation (COMING SOON!)

esds-funnel can be installed from PyPI with pip:

python -m pip install esds-funnel

esds-funnel is also available from conda-forge for conda installations:

conda install -c conda-forge esds-funnel

See documentation for more information.

esds-funnel's People

Contributors

andersy005 avatar dependabot[bot] avatar mgrover1 avatar pre-commit-ci[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

esds-funnel's Issues

Dask serialization error when using FunnelResult within Prefect + Dask executor

When using funnel.prefect.result.FunnelResult without dask, everything seems to be working fine:

In [1]: import os

In [2]: os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"

In [3]: from prefect.executors import DaskExecutor

In [4]: from prefect import Flow, task

In [5]: from funnel import SQLMetadataStore, CacheStore

In [6]: from funnel.prefect.result import FunnelResult

In [8]: r = FunnelResult(SQLMetadataStore(CacheStore("/tmp/test"), serializer='xarray.zarr'))

In [9]: @task(target="bar.zarr", result=r)
   ...: def xarray_task():
   ...:     time.sleep(10)
   ...:     ds = xr.tutorial.open_dataset('rasm').isel(time=0)
   ...:     return ds
   ...: 

In [11]: with Flow("hello") as flow:
    ...:     xarray_task()
    ...: 
In [12]: flow.run()
[2021-11-18 14:10:42-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'hello'
[2021-11-18 14:10:42-0700] INFO - prefect.TaskRunner | Task 'xarray_task': Starting task run...
[2021-11-18 14:10:43-0700] INFO - prefect.TaskRunner | Task 'xarray_task': Finished task run for task with final state: 'Cached'
[2021-11-18 14:10:43-0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Out[12]: <Success: "All reference tasks succeeded.">

However, using dask executor with Prefect failed

In [13]: flow.run(executor = DaskExecutor(cluster_kwargs={"processes": False, "threads_per_worker": 8}, debu
    ...: g=True))
[2021-11-18 14:10:50-0700] INFO - prefect.FlowRunner | Beginning Flow run for 'hello'
[2021-11-18 14:10:50-0700] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`...
[2021-11-18 14:10:50-0700] INFO - prefect.DaskExecutor | The Dask dashboard is available at http://10.0.0.209:8787/status
distributed.comm.inproc - WARNING - Closing dangling queue in <InProc  local=inproc://10.0.0.209/18039/8 remote=inproc://10.0.0.209/18039/1>
[2021-11-18 14:10:50-0700] ERROR - prefect.FlowRunner | Unexpected error: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")
Traceback (most recent call last):
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
_pickle.PicklingError: Can't pickle <function xarray_task at 0x10c4a5550>: it's not the same object as __main__.xarray_task

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 618, in get_flow_run_state
    task_states[task] = executor.submit(
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/prefect/executors/dask.py", line 421, in submit
    fut = self.client.submit(
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/client.py", line 1576, in submit
    futures = self._graph_to_futures(
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/client.py", line 2627, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/dask/highlevelgraph.py", line 1076, in __dask_distributed_pack__
    "state": layer.__dask_distributed_pack__(
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/dask/highlevelgraph.py", line 434, in __dask_distributed_pack__
    dsk = toolz.valmap(dumps_task, dsk)
  File "cytoolz/dicttoolz.pyx", line 178, in cytoolz.dicttoolz.valmap
  File "cytoolz/dicttoolz.pyx", line 203, in cytoolz.dicttoolz.valmap
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/worker.py", line 4138, in dumps_task
    d["kwargs"] = warn_dumps(task[3])
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/worker.py", line 4150, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/abanihi/.mambaforge/envs/esds-funnel-dev/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
[2021-11-18 14:10:50-0700] ERROR - prefect.hello | Unexpected error occured in FlowRunner: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")
Out[13]: <Failed: "Unexpected error: TypeError("cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object")">

API Design

Classes

  • Collection
    • Parameters
      • json - intake-esm catalog
      • query - specify which part of the dataset you are looking for (warn to not be too specific)
    • Not parameters
      • Cache - specify with the config
        API:
  • Cache Class - cache module
    • get()
    • make()
    • read()
    • exists()
    • remove()
    • identity()
  • Collection Class - collection module
    • to_dataset_dict()
    • _generate_dsets()
    • compute() - tell people to be careful, will load everything into memory
    • cache() - logic would be within cache module, apply to all datasets we have
  • Derived_Variable Class- dervied_variable module
    • call
    • _ensure_variables()

Best practices

  • Use pydantic
  • Write as many unit tests as possible - test these core pieces

Add cache database functionality

@andersy005

I have been playing around with a potential class for this functionality... would we want the dataset to look something like this?

Screen Shot 2021-07-01 at 10 51 06 AM

Then, when someone accesses this, they would use

test = df.iloc[0]
new_cache = funnel.CacheStore(test.cache_dir)
read_ds = new_cache.get(test.name, test.serializer)

Consolidate `CacheStore` and `MetadataStore` into a single object

Currently, funnel's caching mechanism consists of two objects

  • CacheStore: used for caching artifacts
  • MetadataStore: used for caching metadata associated with artifacts

The MetadataStore serves as the main entry-point. With #51, it has become clear that we don't need (1) to maintain two separate objects, (2) to use a SQL-like database. Once #51 is merged, we should be able to consolidate the CacheStore and MetadataStore into a single object. This object would take care of both artifacts and sidecar metadata files.

Variable Database

Would it make sense to have a variable database as well as a cache database? The collection object could have a pandas dataframe representation of the variables available (directly from the catalog), as well as a list of derived variables... when one adds another derived variable to the derived_variables registry, it would add it here... this could fall within the derivemodule.

Add Documentation

  • Set up documentation build on RTD
  • Add template for docs
  • Add demos on CacheStore and FunnelResult

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.