Code Monkey home page Code Monkey logo

intake-parquet's Introduction

Intake: Take 2

A general python package for describing, loading and processing data

Logo

Build Status Documentation Status

Taking the pain out of data access and distribution

Intake is an open-source package to:

  • describe your data declaratively
  • gather data sets into catalogs
  • search catalogs and services to find the right data you need
  • load, transform and output data in many formats
  • work with third party remote storage and compute platforms

Documentation is available at Read the Docs.

Please report issues at https://github.com/intake/intake/issues

Install

Recommended method using conda:

conda install -c conda-forge intake

You can also install using pip, in which case you have a choice as to how many of the optional dependencies you install, with the simplest having least requirements

pip install intake

Note that you may well need specific drivers and other plugins, which usually have additional dependencies of their own.

Development

  • Create development Python environment with the required dependencies, ideally with conda. The requirements can be found in the yml files in the scripts/ci/ directory of this repo.
    • e.g. conda env create -f scripts/ci/environment-py311.yml and then conda activate test_env
  • Install intake using pip install -e .
  • Use pytest to run tests.
  • Create a fork on github to be able to submit PRs.
  • We respect, but do not enforce, pep8 standards; all new code should be covered by tests.

intake-parquet's People

Contributors

danielballan avatar jbcrail avatar martindurant avatar maximlt avatar remche avatar seibert avatar talebzeghmi avatar zaneselvans avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

intake-parquet's Issues

Support for new fsspec-style caching

As far as I can tell, this driver doesn't support fsspec-style caching at the moment:

import intake_parquet

path = "https://github.com/apache/parquet-testing/raw/master/data/alltypes_plain.parquet"
intake_parquet.ParquetSource(urlpath=path, engine="pyarrow").read()  # works
intake_parquet.ParquetSource(urlpath="simplecache::"+path, engine="pyarrow").read()  # fails

Unsure if this is more appropriate to be fixed at the dask layer or here. Is there a roadmap for rolling out fsspec caching to various drivers?

gather_statistics to be an intake.open_parquet() argument

dask/dask#5272 forced me to consider surfacing gather_statistics as another intake.open_parquet() parameter passed along to dd.read_parquet().

Use case is when working with ~2k parquet files written by Spark where by default the metadata file is not written. With gather_statistics=False Dask won't read every when gathering metadata, but instead rely on metadata of the 0th file.

Allowing public catalog access and efficient file caching

I'm trying to provide convenient public access to data stored in Parquet files in Google cloud storage through an Intake catalog (see this repo) but have run into a few snags. I suspect that some of them may be easy to fix, but I don't have enough context on how the system works and trial-and-error seems slow. I haven't found many public examples of how catalogs are typically set up to learn from.

One big file, or many smaller files?

  • Should we use one big parquet file with many row-groups, or lots of small parquet files each with a single row-group? Right now the row-groups each contain one state-year of data (e.g. Colorado 2015), since users will often only be interested in data from a particular region or time period.
  • I thought that using lots of smaller files along with local file caching would allow users to automatically cache only the parts of the data they need rather than everything, but then all of the files do get accessed (and thus cached) when filtering based on state and year, since the metadata indicating which files contain the requested states / years is in the files.
  • Is there an easy way to avoid this, and still treat all of the files as a single table? Can the metadata be stored outside the files using PyArrow?
  • It also seems like querying a bunch of smaller files remotely slows things down, in a way that doesn't happen locally (where one big, or many small files seem to perform equally well)

Conditionally disable caching

  • Is there an easy way to disable file caching when the data is already local? We often work with local data in development and caching just slows everything down and wastes disk space in this case, but it would be nice to use the same method of access (the intake catalog) in different contexts, with caching enabled/disabled depending on what kind of path is being used as the base location for the catalogs.
  • Is there some way that a user can explicitly turn of caching when accessing a data source in the catalog? e.g. if we're working on a JupyterHub in the same region as the data is stored and it has a very fast net connection but not much local disk, local caching may be more trouble than it's worth.

Allowing anonymous public access

  • Accessing the parquet data via https://storage.googleapis.com seems much less performant than using the gs:// protocol.
  • Trying to filter the Parquet data over https:// doesn't seem to work at all -- it downloads everything no matter what.
  • Wildcards also don't work with https:// URLs (I get a 403 forbidden error)) and neither does providing the URL of the "folder" that all the parquet files is in.
  • The only option that seems to work at all is providing a complete path to a single monolithic parquet file, but then with file caching the entire thing gets downloaded every time. And under some circumstances it seems like it's trying to read the entire Parquet file into memory which doesn't work since it's like 50-100GB uncompressed.
  • Is it possible to provide public access using gs://? It seems like we just get access denied even when all the objects are public.
  • Is there some other way to enumerate all of the different valid paths using user parameters and templating in the catalog paths, and still have them be treated as a single coherent table / dataset?

Current catalog

description: A catalog of open energy system data for use by climate advocates,
  policymakers, journalists, researchers, and other members of civil society.

plugins:
  source:
    - module: intake_parquet

metadata:
  creator:
    title: "Catalyst Cooperative"
    email: "[email protected]"
    path: "https://catalyst.coop"

sources:

  hourly_emissions_epacems:
    driver: parquet
    description: Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: 'pyarrow'
      urlpath: 'simplecache::{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems.parquet'
      storage_options:
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

  hourly_emissions_epacems_partitioned:
    driver: parquet
    description: Hourly pollution emissions and plant operational data reported via
      Continuous Emissions Monitoring Systems (CEMS) as required by 40 CFR Part 75.
      Includes CO2, NOx, and SO2, as well as the heat content of fuel consumed and
      gross power output. Hourly values reported by US EIA ORISPL code and emissions
      unit (smokestack) ID.
    metadata:
      title: Continuous Emissions Monitoring System (CEMS) Hourly Data
      type: application/parquet
      provider: US Environmental Protection Agency Air Markets Program
      path: "https://ampd.epa.gov/ampd"
      license:
        name: "CC-BY-4.0"
        title: "Creative Commons Attribution 4.0"
        path: "https://creativecommons.org/licenses/by/4.0"
    args: # These arguments are for dask.dataframe.read_parquet()
      engine: 'pyarrow'
      urlpath: 'simplecache::{{ env(PUDL_INTAKE_PATH) }}/hourly_emissions_epacems/*.parquet'
      storage_options:
        simplecache:
          cache_storage: '{{ env(PUDL_INTAKE_CACHE) }}'

open_parquet + remote server

Hi,

This may be a usage question and not a bug and I also apologize if this is not where this sort of question should be asked. I am struggling how to limit columns on a parquet file in an intake server. Based on the docs and the example notebook, I think this should work:

>>> import intake
>>> import intake_parquet
>>> intake_parquet.__version__, intake.__version__
('0.2.3', '0.6.3')
>>> cat = intake.open_catalog('intake://localhost:5555')
>>> type(cat.big_parquet)
intake.container.dataframe.RemoteDataFrame
>>> pq = intake.open_parquet(cat.big_parquet, columns=['Column 1'])
>>> type(pq) 
intake_parquet.source.ParquetSource
>>> pq.read()
...
TypeError: argument of type 'RemoteDataFrame' is not iterable

If I read the file directly without using intake.open_parquet, it works fine, but I am precluded from limiting the columns.

>>> cat.big_parquet.read()

Is this the expected behavior? Apologies in advance if I missed it in the docs.

Filter Documentation

I'm having difficulty implementing filtering through parameters in a catalog entry, are there any examples of this?

The example given uses the open_[plugin_name] syntax, is this the only way to pass non-string parameters?

from intake import open_parquet
source = open_parquet('test.parq', columns=['f', 'i32', 'cat'])
source.read().head()

parquet with s3 uri?

Hi! I'm sort of new to intake, so I apologize if this question has an obvious answer.

I have a bunch of parquet files sitting in s3 that aren't public. They are accessible with my AWS credentials using s3-style URIs. (E.g. s3://bucket-name/data.parquet). Is it possible to use intake-parquet with this data? I see how to do with with public URLs, but I'm stumped with s3 URIs.

Thanks!

Working with partitions

We are working with a multiple files catalog, eg:

plugins:
  source:
    - module: intake_parquet
sources:
  test:
    description: Short example parquet data
    driver: parquet
    args:
      urlpath: 
        - s3://bucket/path/file.parquet
        - s3://bucket/path/file2.parquet
        - s3://bucket/path/file3.parquet
      storage_options:
        anon: True
        client_kwargs:
          endpoint_url: https://example.com
  1. With only two entries, discover() is fine, we can read_partition(0) and read_partition(1), but a full read() fails with ValueError: storage_options passed with buffer, or non-supported URL, probably because ParquetSource.read()does not handle array in url_path.
  2. With more that 2 entries, discover() fails with a `KeyError.

Thanks for maintaining this intake plugin !

reading parquet folder hangs indefinitely

reading the following intake catalog

plugins:
  source:
    - module: intake_parquet
sources:
  out_2:
    driver: parquet
    args:
      urlpath: gs://bucket_name/out_2.parq

cat = intake.open_catalog("catalog.yaml")
cat.out_2.read()

I get

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-28-2831bbb95f63> in <module>()
----> 1 cat.out_2.read()

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/intake_parquet/source.py in read(self)
     75         Create single pandas dataframe from the whole data-set
     76         """
---> 77         self._load_metadata()
     78         return self._df.compute()
     79 

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    124         """load metadata only if needed"""
    125         if self._schema is None:
--> 126             self._schema = self._get_schema()
    127             self.datashape = self._schema.datashape
    128             self.dtype = self._schema.dtype

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/intake_parquet/source.py in _get_schema(self)
     58     def _get_schema(self):
     59         if self._df is None:
---> 60             self._df = self._to_dask()
     61         dtypes = {k: str(v) for k, v in self._df._meta.dtypes.items()}
     62         self._schema = base.Schema(datashape=None,

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/intake_parquet/source.py in _to_dask(self)
    107         urlpath = self._get_cache(self._urlpath)[0]
    108         self._df = dd.read_parquet(urlpath,
--> 109                                    storage_options=self._storage_options, **self._kwargs)
    110         self._load_metadata()
    111         return self._df

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, **kwargs)
    343         read_from_paths=read_from_paths,
    344         engine=engine,
--> 345         **kwargs,
    346     )
    347 

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, **kwargs)
    206         # correspond to a row group (populated below).
    207         parts, pf, gather_statistics, fast_metadata, base_path = _determine_pf_parts(
--> 208             fs, paths, gather_statistics, **kwargs
    209         )
    210 

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    145                 pf.cats = paths_to_cats(fns, scheme)
    146                 parts = paths.copy()
--> 147     elif fs.isdir(paths[0]):
    148         # This is a directory, check for _metadata, then _common_metadata
    149         paths = fs.glob(paths[0] + fs.sep + "*")

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
    116     def wrapper(*args, **kwargs):
    117         self = obj or args[0]
--> 118         return maybe_sync(func, self, *args, **kwargs)
    119 
    120     return wrapper

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     95         if inspect.iscoroutinefunction(func):
     96             # run the awaitable on the loop
---> 97             return sync(loop, func, *args, **kwargs)
     98         else:
     99             # just call the blocking function

/home/user/miniconda3/envs/intake/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
     63     else:
     64         while not e.is_set():
---> 65             e.wait(10)
     66     if error[0]:
     67         typ, exc, tb = error[0]

/home/user/miniconda3/envs/intake/lib/python3.7/threading.py in wait(self, timeout)
    550             signaled = self._flag
    551             if not signaled:
--> 552                 signaled = self._cond.wait(timeout)
    553             return signaled
    554 

/home/user/miniconda3/envs/intake/lib/python3.7/threading.py in wait(self, timeout)
    298             else:
    299                 if timeout > 0:
--> 300                     gotit = waiter.acquire(True, timeout)
    301                 else:
    302                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

here is the directory structure

out_2.parq
├── _common_metadata
├── _metadata
├── part.0.parquet
├── part.1.parquet
├── part.2.parquet
├── part.3.parquet
└── part.4.parquet

Python 3.7.9
intake: 0.6.0
intake_parquet: 0.2.3
fsspec: 0.8.3
fastparquet: 0.4.2
dask: 2020.12.0

Add a to_cudf method for reading directly into GPU memory

Hi there,

Just wondering if there's scope for a to_cudf type functionality so that users can read Parquet files directly into GPU memory (bypassing the CPU). This would be using the cudf.read_parquet function.

Happy to submit a Pull Request for this, but would like to have a discussion around the implementation, whether it should be handled as a to_cudf method, or via something like engine="cudf" (though cudf also has a "pyarrow" engine like pandas).

One issue though is that cudf cannot read multi-file Parquet folders yet (see rapidsai/cudf#1688), only single binary parquet files. This might get implemented in the future (v0.16?) cudf release though.

Add `datashader.SpatialDataFrame` reader

There is a special spatially optimized dataframe that we've been working on for datashader. It is a parquet file that is organized in such a way that datashader can ignore certain partitions when zoomed in. It would be nice to add support for reading to this class from parquet files if datashader is in the env.

Does that seem like a reasonable addition to this plugin? If so I could work on it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.