Code Monkey home page Code Monkey logo

partd's Introduction

PartD

Build Status Version Status

Key-value byte store with appendable values

Partd stores key-value pairs. Values are raw bytes. We append on old values.

Partd excels at shuffling operations.

Operations

PartD has two main operations, append and get.

Example

  1. Create a Partd backed by a directory:

    >>> import partd
    >>> p = partd.File('/path/to/new/dataset/')
    
  2. Append key-byte pairs to dataset:

    >>> p.append({'x': b'Hello ', 'y': b'123'})
    >>> p.append({'x': b'world!', 'y': b'456'})
    
  3. Get bytes associated to keys:

    >>> p.get('x')         # One key
    b'Hello world!'
    
    >>> p.get(['y', 'x'])  # List of keys
    [b'123456', b'Hello world!']
    
  4. Destroy partd dataset:

    >>> p.drop()
    

That's it.

Implementations

We can back a partd by an in-memory dictionary:

>>> p = Dict()

For larger amounts of data or to share data between processes we back a partd by a directory of files. This uses file-based locks for consistency.:

>>> p = File('/path/to/dataset/')

However this can fail for many small writes. In these cases you may wish to buffer one partd with another, keeping a fixed maximum of data in the buffering partd. This writes the larger elements of the first partd to the second partd when space runs low:

>>> p = Buffer(Dict(), File(), available_memory=2e9)  # 2GB memory buffer

You might also want to have many distributed process write to a single partd consistently. This can be done with a server

  • Server Process:

    >>> p = Buffer(Dict(), File(), available_memory=2e9)  # 2GB memory buffer
    >>> s = Server(p, address='ipc://server')
    
  • Worker processes:

    >>> p = Client('ipc://server')  # Client machine talks to remote server
    

Encodings and Compression

Once we can robustly and efficiently append bytes to a partd we consider compression and encodings. This is generally available with the Encode partd, which accepts three functions, one to apply on bytes as they are written, one to apply to bytes as they are read, and one to join bytestreams. Common configurations already exist for common data and compression formats.

We may wish to compress and decompress data transparently as we interact with a partd. Objects like BZ2, Blosc, ZLib and Snappy exist and take another partd as an argument.:

>>> p = File(...)
>>> p = ZLib(p)

These work exactly as before, the (de)compression happens automatically.

Common data formats like Python lists, numpy arrays, and pandas dataframes are also supported out of the box.:

>>> p = File(...)
>>> p = NumPy(p)
>>> p.append({'x': np.array([...])})

This lets us forget about bytes and think instead in our normal data types.

Composition

In principle we want to compose all of these choices together

  1. Write policy: Dict, File, Buffer, Client
  2. Encoding: Pickle, Numpy, Pandas, ...
  3. Compression: Blosc, Snappy, ...

Partd objects compose by nesting. Here we make a partd that writes pickle encoded BZ2 compressed bytes directly to disk:

>>> p = Pickle(BZ2(File('foo')))

We could construct more complex systems that include compression, serialization, buffering, and remote access.:

>>> server = Server(Buffer(Dict(), File(), available_memory=2e0))

>>> client = Pickle(Snappy(Client(server.address)))
>>> client.append({'x': [1, 2, 3]})

partd's People

Contributors

cclauss avatar charlesbluca avatar chdoig avatar danielballan avatar del82 avatar edwardbetts avatar eriknw avatar gdementen avatar graingert avatar jacobtomlinson avatar jcrist avatar jrbourbeau avatar jsignell avatar markcbell avatar martindurant avatar mindw avatar mrocklin avatar phofl avatar qulogic avatar thomasjpfan avatar tirkarthi avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

partd's Issues

Support for pandas DataFrame subclasses

When dask uses partd for eg shuffle operations, the dataframes always come back as a pandas.DataFrame, even if a subclass was stored (xref geopandas/dask-geopandas#59 (comment)).

For example:

import geopandas
gdf = geopandas.read_file(geopandas.datasets.get_path("naturalearth_lowres"))

import partd
# dask.dataframe shuffle operations use PandasBlocks
p = partd.PandasBlocks(partd.Dict())

p.append({"gdf": gdf})
res = p.get("gdf")

>>> type(gdf)
pandas.core.frame.DataFrame
>>> type(res)
pandas.core.frame.DataFrame

To be able to use dask's shuffle operations with dask_geopandas, which uses a pandas subclass as the partition type, the subclass should be preserved in the partd roundtrip (or are there other ways that you can override / dispatch this operation in dask?).
I was wondering how other dask.dataframe subclasses handle this, but eg dask_cudf doesn't seem to support "disk"-based shuffling.

Pandas 1.3.0 compatibility

The partd test suite fails with the nightly version of pandas, which you can install with

python -m pip install --no-deps --pre -i https://pypi.anaconda.org/scipy-wheels-nightly/simple pandas

I've included the test failure tracebacks below.

Test failures:
================================================================================== FAILURES ===================================================================================
____________________________________________________________________________ test_serialize[base0] ____________________________________________________________________________

base = Timestamp('1987-03-03 01:01:01+0001', tz='pytz.FixedOffset(1)')

    @pytest.mark.parametrize('base', [
        pd.Timestamp('1987-03-3T01:01:01+0001'),
        pd.Timestamp('1987-03-03 01:01:01-0600', tz='US/Central'),
    ])
    def test_serialize(base):
        df = pd.DataFrame({'x': [
            base + pd.Timedelta(seconds=i)
            for i in np.random.randint(0, 1000, size=10)],
                           'y': list(range(10)),
                           'z': pd.date_range('2017', periods=10)})
>       df2 = deserialize(serialize(df))

partd/tests/test_pandas.py:110:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
partd/pandas.py:175: in serialize
    h, b = block_to_header_bytes(block)
partd/pandas.py:141: in block_to_header_bytes
    bytes = pnp.compress(pnp.serialize(values), values.dtype)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = <DatetimeArray>
[
['2017-01-01 00:00:00', '2017-01-02 00:00:00', '2017-01-03 00:00:00',
 '2017-01-04 00:00:00', '2017-...0:00:00', '2017-01-08 00:00:00', '2017-01-09 00:00:00',
 '2017-01-10 00:00:00']
]
Shape: (1, 10), dtype: datetime64[ns]

    def serialize(x):
        if x.dtype == 'O':
            l = x.flatten().tolist()
            with ignoring(Exception):  # Try msgpack (faster on strings)
                return frame(msgpack.packb(l, use_bin_type=True))
            return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
        else:
>           return x.tobytes()
E           AttributeError: 'DatetimeArray' object has no attribute 'tobytes'

partd/numpy.py:101: AttributeError
____________________________________________________________________________ test_serialize[base1] ____________________________________________________________________________

base = Timestamp('1987-03-03 01:01:01-0600', tz='US/Central')

    @pytest.mark.parametrize('base', [
        pd.Timestamp('1987-03-3T01:01:01+0001'),
        pd.Timestamp('1987-03-03 01:01:01-0600', tz='US/Central'),
    ])
    def test_serialize(base):
        df = pd.DataFrame({'x': [
            base + pd.Timedelta(seconds=i)
            for i in np.random.randint(0, 1000, size=10)],
                           'y': list(range(10)),
                           'z': pd.date_range('2017', periods=10)})
>       df2 = deserialize(serialize(df))

partd/tests/test_pandas.py:110:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
partd/pandas.py:175: in serialize
    h, b = block_to_header_bytes(block)
partd/pandas.py:141: in block_to_header_bytes
    bytes = pnp.compress(pnp.serialize(values), values.dtype)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = <DatetimeArray>
[
['2017-01-01 00:00:00', '2017-01-02 00:00:00', '2017-01-03 00:00:00',
 '2017-01-04 00:00:00', '2017-...0:00:00', '2017-01-08 00:00:00', '2017-01-09 00:00:00',
 '2017-01-10 00:00:00']
]
Shape: (1, 10), dtype: datetime64[ns]

    def serialize(x):
        if x.dtype == 'O':
            l = x.flatten().tolist()
            with ignoring(Exception):  # Try msgpack (faster on strings)
                return frame(msgpack.packb(l, use_bin_type=True))
            return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
        else:
>           return x.tobytes()
E           AttributeError: 'DatetimeArray' object has no attribute 'tobytes'

partd/numpy.py:101: AttributeError

From digging around a bit it looks like we've been relying on some pandas internals, specifically DataFrame._data.blocks, which has experienced some changes upstream (xref pandas-dev/pandas#39146).

cc @jorisvandenbossche in case you have any thoughts. I started making a few changes locally which are similar to dask/dask#7318, but haven't gotten things to fully work yet.

Error with datetime while running tests

I'm getting this error while running the tests (docstring of tz_localize elided for brevity)

partd/tests/test_pandas.py:110: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
partd/pandas.py:175: in deserialize
    for (h, b) in zip(headers[2:], bytes[2:])]
partd/pandas.py:175: in <listcomp>
    for (h, b) in zip(headers[2:], bytes[2:])]
partd/pandas.py:143: in block_from_header_bytes
    values = pd.DatetimeIndex(values).tz_localize('utc').tz_convert(
/usr/lib/python3.7/site-packages/pandas/core/accessor.py:91: in f
    return self._delegate_method(name, *args, **kwargs)
/usr/lib/python3.7/site-packages/pandas/core/indexes/datetimelike.py:721: in _delegate_method
    result = operator.methodcaller(name, *args, **kwargs)(self._data)

E               TypeError: Already tz-aware, use tz_convert to convert.

/usr/lib/python3.7/site-packages/pandas/core/arrays/datetimes.py:1049: TypeError

This is with pandas 0.24.1.
Tests run fine if I replace line 143 in pandas.py with just values = pd.DatetimeIndex(values), it seems like pandas restores the timezone just fine, not sure which version it started to work.

Migrate CI to GitHub Actions

Due to changes in the Travis CI billing, the Dask org is migrating CI to GitHub Actions.

This repo contains a .travis.yml file which needs to be replaced with an equivalent .github/workflows/ci.yml file.

See dask/community#107 for more details.

Add zstd as a compression option?

would there be interest in adding zstd to partd. At lowish compression levels I've found it to have better compression and around twice as fast as snappy.

1.2.0: pytest is failing

I'm trying to package your module as an rpm package. So I'm using the typical PEP517 based build, install and test cycle used on building packages from non-root account.

  • python3 -sBm build -w --no-isolation
  • because I'm calling build with --no-isolation I'm using during all processes only locally installed modules
  • install .whl file in </install/prefix>
  • run pytest with PYTHONPATH pointing to sitearch and sitelib inside </install/prefix>

Here is pytest output:

+ PYTHONPATH=/home/tkloczko/rpmbuild/BUILDROOT/python-partd-1.2.0-5.fc35.x86_64/usr/lib64/python3.8/site-packages:/home/tkloczko/rpmbuild/BUILDROOT/python-partd-1.2.0-5.fc35.x86_64/usr/lib/python3.8/site-packages
+ /usr/bin/pytest -ra
=========================================================================== test session starts ============================================================================
platform linux -- Python 3.8.13, pytest-7.1.2, pluggy-1.0.0
rootdir: /home/tkloczko/rpmbuild/BUILD/partd-1.2.0
plugins: forked-1.4.0, xdist-2.5.0, rerunfailures-10.2
collected 50 items

partd/tests/test_buffer.py ...                                                                                                                                       [  6%]
partd/tests/test_compressed.py ..                                                                                                                                    [ 10%]
partd/tests/test_dict.py ....                                                                                                                                        [ 18%]
partd/tests/test_encode.py ..                                                                                                                                        [ 22%]
partd/tests/test_file.py .......                                                                                                                                     [ 36%]
partd/tests/test_numpy.py .......                                                                                                                                    [ 50%]
partd/tests/test_pandas.py .........                                                                                                                                 [ 68%]
partd/tests/test_partd.py ....                                                                                                                                       [ 76%]
partd/tests/test_pickle.py ..                                                                                                                                        [ 80%]
partd/tests/test_python.py .                                                                                                                                         [ 82%]
partd/tests/test_utils.py ..                                                                                                                                         [ 86%]
partd/tests/test_zmq.py .......                                                                                                                                      [100%]

============================================================================= warnings summary =============================================================================
partd/tests/test_pandas.py:8
  /home/tkloczko/rpmbuild/BUILD/partd-1.2.0/partd/tests/test_pandas.py:8: FutureWarning: pandas.util.testing is deprecated. Use the functions in the public API at pandas.testing instead.
    import pandas.util.testing as tm

partd/tests/test_pandas.py: 20 warnings
  /home/tkloczko/rpmbuild/BUILDROOT/python-partd-1.2.0-5.fc35.x86_64/usr/lib/python3.8/site-packages/partd/pandas.py:113: DeprecationWarning: The Index._get_attributes_dict method is deprecated, and will be removed in a future version
    header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
===================================================================== 50 passed, 21 warnings in 1.48s ======================================================================

Missing requirements

Hi,

 % pew mktmpenv
Using base prefix '/usr'
New python executable in /home/fr/.local/share/virtualenvs/e5ecf64498bb9ee/bin/python3
Also creating executable in /home/fr/.local/share/virtualenvs/e5ecf64498bb9ee/bin/python
Installing setuptools, pip, wheel...done.
This is a temporary environment. It will be deleted when you exit
Launching subshell in virtual environment. Type 'exit' or 'Ctrl+D' to return.
 % pip install partd
Collecting partd
Collecting locket (from partd)
Installing collected packages: locket, partd
Successfully installed locket-0.2.0 partd-0.3.2
 % python -c 'import partd'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/fr/.local/share/virtualenvs/e5ecf64498bb9ee/lib/python3.5/site-packages/partd/__init__.py", line 3, in <module>
    from .file import File
  File "/home/fr/.local/share/virtualenvs/e5ecf64498bb9ee/lib/python3.5/site-packages/partd/file.py", line 3, in <module>
    from .core import Interface
  File "/home/fr/.local/share/virtualenvs/e5ecf64498bb9ee/lib/python3.5/site-packages/partd/core.py", line 7, in <module>
    from toolz import memoize
ImportError: No module named 'toolz'

and after I install toolz, pandas is missing.

With toolz and pandas, the import goes well.

`import partd` fails if run in a location where `log` exists and is a directory

If the current working directory is one in which the path log already exists and is a directory, import partd fails:

$ ls -l
total 0
drwxr-xr-x  2 somebody  wheel  68 Oct 14 11:48 log
$ python
Python 3.5.2 |Anaconda custom (x86_64)| (default, Jul  2 2016, 17:52:12)
[GCC 4.2.1 Compatible Apple LLVM 4.2 (clang-425.0.28)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import partd
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/path/to/anaconda3/lib/python3.5/site-packages/partd/__init__.py", line 16, in <module>
    from .zmq import Client, Server
  File "/path/to/anaconda3/lib/python3.5/site-packages/partd/zmq.py", line 33, in <module>
    log('Import zmq')
  File "/path/to/anaconda3/lib/python3.5/site-packages/partd/zmq.py", line 29, in log
    with open('log', 'a') as f:
IsADirectoryError: [Errno 21] Is a directory: 'log'

Pandas extension array support

I'm writing a pandas ExtensionsArray and using this inside a Dask DataFrame.

I'm getting a failure inside partd during dask shuffle operations, and it looks like it may be coming from the fact that pard is treating the extension array like a numpy array. Here is the end of an example stack trace.

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/dask/dataframe/shuffle.py in shuffle_group_3(df, col, npartitions, p)
    624     g = df.groupby(col)
    625     d = {i: g.get_group(i) for i in g.groups}
--> 626     p.append(d, fsync=True)
    627 
    628 

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/partd/encode.py in append(self, data, **kwargs)
     21 
     22     def append(self, data, **kwargs):
---> 23         data = valmap(self.encode, data)
     24         data = valmap(frame, data)
     25         self.partd.append(data, **kwargs)

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/toolz/dicttoolz.py in valmap(func, d, factory)
     81     """
     82     rv = factory()
---> 83     rv.update(zip(iterkeys(d), map(func, itervalues(d))))
     84     return rv
     85 

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/partd/pandas.py in serialize(df)
    156 
    157     for block in df._data.blocks:
--> 158         h, b = block_to_header_bytes(block)
    159         headers.append(h)
    160         bytes.append(b)

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/partd/pandas.py in block_to_header_bytes(block)
    126 
    127     header = (block.mgr_locs.as_array, values.dtype, values.shape, extension)
--> 128     bytes = pnp.compress(pnp.serialize(values), values.dtype)
    129     return header, bytes
    130 

~/anaconda3/envs/spatialpandas_dev/lib/python3.7/site-packages/partd/numpy.py in serialize(x)
     99         return frame(pickle.dumps(l, protocol=pickle.HIGHEST_PROTOCOL))
    100     else:
--> 101         return x.tobytes()
    102 
    103 

AttributeError: 'MultiLineArray' object has no attribute 'tobytes'

Here MultiLineArray is the name of my ExtensionArray type.

I tried adding a tobytes method, but then there are other numpy array/dtype methods that are expected as well.

Is ExtensionArray support something that would need to be explicitly supported in partd? Or is there something I can do in theExtensionArray implementation to help partd understand how to serialize it?

cc @jorisvandenbossche and @TomAugspurger in case you have any quick thoughts from the ExtensionArray side of things

Thanks!

Serializing `pandas` `Index` with extension dtypes fails

Serializing a DataFrame whose columns contain pandas extension dtypes (e.g. Int64, string[python], string[pyarrow], etc.) works as expected, but fails when the index contains extension dtypes.

For example

import pandas as pd
import partd

df = pd.DataFrame({"a": [1, 2, 3]}, index=[4, 5, 6])
df = df.astype({"a": "Int64"})
df_roundtrip = partd.pandas.deserialize(partd.pandas.serialize(df))
pd.testing.assert_frame_equal(df, df_roundtrip)  # this works fine

df.index = df.index.astype("Int64")
df_roundtrip = partd.pandas.deserialize(partd.pandas.serialize(df))  # this fails

fails with

Traceback (most recent call last):
  File "/Users/james/projects/dask/dask/test-partd-extension-dtypes.py", line 11, in <module>
    df_roundtrip = partd.pandas.deserialize(partd.pandas.serialize(df))  # this fails
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/partd/pandas.py", line 179, in serialize
    ind_header, ind_bytes = index_to_header_bytes(df.index)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/partd/pandas.py", line 112, in index_to_header_bytes
    bytes = pnp.compress(pnp.serialize(values), values.dtype)
  File "/Users/james/mambaforge/envs/dask-py39/lib/python3.9/site-packages/partd/numpy.py", line 102, in serialize
    return x.tobytes()
AttributeError: 'IntegerArray' object has no attribute 'tobytes'

Release 1.2.0

Unless there are objections, I'm planning to release partd 1.2.0 in order to get #49 out before the pandas 1.3.0 release

xref dask/dask#7507

Release 1.4.0

I'd like to do a partd release later today in order to get the change in #64 included in a released version of partd (that particular change is needed for Dask's dataframe.convert-string functionality). Will wait for a bit for any feedback but will then start pushing out the release.

Bundled versioneer is broken on Python 3.12

Describe the issue:

This is due to using the removed configparser.SafeConfigParser:

Traceback (most recent call last):
  File "/usr/lib/rpm/redhat/pyproject_buildrequires.py", line 555, in main
    generate_requires(
  File "/usr/lib/rpm/redhat/pyproject_buildrequires.py", line 451, in generate_requires
    generate_build_requirements(backend, requirements)
  File "/usr/lib/rpm/redhat/pyproject_buildrequires.py", line 274, in generate_build_requirements
    new_reqs = get_requires(config_settings=requirements.config_settings)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/site-packages/setuptools/build_meta.py", line 341, in get_requires_for_build_wheel
    return self._get_build_requires(config_settings, requirements=['wheel'])
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/site-packages/setuptools/build_meta.py", line 323, in _get_build_requires
    self.run_setup()
  File "/usr/lib/python3.12/site-packages/setuptools/build_meta.py", line 488, in run_setup
    self).run_setup(setup_script=setup_script)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/site-packages/setuptools/build_meta.py", line 338, in run_setup
    exec(code, locals())
  File "<string>", line 8, in <module>
  File "/builddir/build/BUILD/partd-1.4.0/versioneer.py", line 1480, in get_version
    return get_versions()["version"]
           ^^^^^^^^^^^^^^
  File "/builddir/build/BUILD/partd-1.4.0/versioneer.py", line 1412, in get_versions
    cfg = get_config_from_root(root)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/builddir/build/BUILD/partd-1.4.0/versioneer.py", line 342, in get_config_from_root
    parser = configparser.SafeConfigParser()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: module 'configparser' has no attribute 'SafeConfigParser'. Did you mean: 'RawConfigParser'?

Environment:

  • Dask version: n/a
  • Python version: 3.12.0 b3
  • Operating System: Fedora
  • Install method (conda, pip, source): source

create_block_manager_from_blocks and passing a BlockManager to DataFrame are deprecated in pandas 2.2.0

running the following on Pandas 2.2.0.dev0+394.g820d5a902f:

import pandas as pd

from partd import File, PandasColumns, PandasBlocks

p = PandasBlocks(File("foo"))
p.append({'x': pd.DataFrame({'x': [1]})})
p.get('x')

results in the following warnings:

$ python -Wall demo.py
/home/graingert/mambaforge/envs/partd/lib/python3.12/site-packages/dateutil/tz/tz.py:37: DeprecationWarning: datetime.datetime.utcfromtimestamp() is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC: datetime.datetime.fromtimestamp(timestamp, datetime.UTC).
  EPOCH = datetime.datetime.utcfromtimestamp(0)
/home/graingert/projects/partd/partd/pandas.py:6: DeprecationWarning: create_block_manager_from_blocks is deprecated and will be removed in a future version. Use public APIs instead.
  from pandas.core.internals import create_block_manager_from_blocks, make_block
/home/graingert/projects/partd/partd/pandas.py:6: DeprecationWarning: create_block_manager_from_blocks is deprecated and will be removed in a future version. Use public APIs instead.
  from pandas.core.internals import create_block_manager_from_blocks, make_block
/home/graingert/projects/partd/partd/pandas.py:198: DeprecationWarning: Passing a BlockManager to DataFrame is deprecated and will raise in a future version. Use public APIs instead.
  return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
/home/graingert/projects/partd/partd/pandas.py:198: DeprecationWarning: Passing a BlockManager to DataFrame is deprecated and will raise in a future version. Use public APIs instead.
  return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))
/home/graingert/projects/partd/partd/pandas.py:198: DeprecationWarning: Passing a BlockManager to DataFrame is deprecated and will raise in a future version. Use public APIs instead.
  return pd.DataFrame(create_block_manager_from_blocks(blocks, axes))

Release 1.3.0

There have been several updates around compatibility and remove deprecation warnings. I'd like to push out a release to get those changes into users hands. I'm not aware of any blocking issues, so if there aren't any objections I'll plan to push out partd=1.3.0 later in the week

AttributeError: module 'partd' has no attribute 'file'

What happened:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-13-786241be789c> in <module>
----> 1 inner_join_df = inner_join.compute()

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    281         dask.base.compute
    282         """
--> 283         (result,) = compute(self, traverse=False, **kwargs)
    284         return result
    285 

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    563         postcomputes.append(x.__dask_postcompute__())
    564 
--> 565     results = schedule(dsk, keys, **kwargs)
    566     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    567 

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     82         get_id=_thread_get_id,
     83         pack_exception=pack_exception,
---> 84         **kwargs
     85     )
     86 

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    485                         _execute_task(task, data)  # Re-execute locally
    486                     else:
--> 487                         raise_exception(exc, tb)
    488                 res, worker_id = loads(res_info)
    489                 state["cache"][key] = res

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/local.py in reraise(exc, tb)
    315     if exc.__traceback__ is not tb:
    316         raise exc.with_traceback(tb)
--> 317     raise exc
    318 
    319 

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.6.2/envs/my-virtual-env-3.6.2/lib/python3.6/site-packages/dask/dataframe/shuffle.py in __call__(self, *args, **kwargs)
    805             ) from e
    806         file = partd.File(path)
--> 807         partd.file.cleanup_files.append(path)
    808         # Envelope partd file with compression, if set and available
    809         if partd_compression:

AttributeError: module 'partd' has no attribute 'file'

What you expected to happen:

Successful conversation back to pandas dataframe

Minimal Complete Verifiable Example:

Please see this notebook

Environment:

  • Dask version: dask-2021.3.0
  • Python version: 3.6.2
  • Operating System: MacOSX
  • Install method (conda, pip, source): pip

Buffer should flush() on __del__

Since it doesn't, data loss can occur, especially if you're using partd from multiprocessing workers where the workers aren't responsible for dropping the partd buffer.

Alternatively, the documentation should really mention that users MUST call flush() or they can lose data.

Release?

#67 was merged, but not released, so we're still having failures in CI with nightly pandas.

Can we have a new release of partd?

Generalizing pandas serialization methods

When doing shuffle operations on non-pandas dataframes, we run into issues in partd due to its pandas-specific serialization logic; for example, when trying to do groupby.apply operations with dask-cudf, we run into issues due to not implementing an internal pandas function:

import cudf
import dask_cudfdf = cudf.DataFrame()
df['key'] = [0,0,1,1,1]
df['val']= range(5)
​
ddf = dask_cudf.from_cudf(df, npartitions=1)
​
ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
/tmp/ipykernel_61671/1646734033.py:10: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  ddf.groupby("key").val.apply(lambda x: x.sum()).compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/tmp/ipykernel_61671/1646734033.py in <module>
      8 ddf = dask_cudf.from_cudf(df, npartitions=1)
      9 
---> 10 ddf.groupby("key").val.apply(lambda x: x.sum()).compute()

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    286         dask.base.compute
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result
    290 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/base.py in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    569         postcomputes.append(x.__dask_postcompute__())
    570 
--> 571     results = schedule(dsk, keys, **kwargs)
    572     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    573 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    551     """
    552     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 553     return get_async(
    554         synchronous_executor.submit,
    555         synchronous_executor._max_workers,

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    494             while state["waiting"] or state["ready"] or state["running"]:
    495                 fire_tasks(chunksize)
--> 496                 for key, res_info, failed in queue_get(queue).result():
    497                     if failed:
    498                         exc, tb = loads(res_info)

~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

~/conda/envs/rapids-21.12/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    536         fut = Future()
    537         try:
--> 538             fut.set_result(fn(*args, **kwargs))
    539         except BaseException as e:
    540             fut.set_exception(e)

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]
    235 
    236 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    232     Batch computing of multiple tasks with `execute_task`
    233     """
--> 234     return [execute_task(*a) for a in it]
    235 
    236 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223         failed = False
    224     except BaseException as e:
--> 225         result = pack_exception(e, dumps)
    226         failed = True
    227     return key, result, failed

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218     try:
    219         task, data = loads(task_info)
--> 220         result = _execute_task(task, data)
    221         id = get_id()
    222         result = dumps((result, id))

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    117         # temporaries by their reference count and can execute certain
    118         # operations in-place.
--> 119         return func(*(_execute_task(a, cache) for a in args))
    120     elif not ishashable(arg):
    121         return arg

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/dask/dataframe/shuffle.py in shuffle_group_3(df, col, npartitions, p)
    916         g = df.groupby(col)
    917         d = {i: g.get_group(i) for i in g.groups}
--> 918         p.append(d, fsync=True)
    919 
    920 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/encode.py in append(self, data, **kwargs)
     21 
     22     def append(self, data, **kwargs):
---> 23         data = valmap(self.encode, data)
     24         data = valmap(frame, data)
     25         self.partd.append(data, **kwargs)

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/toolz/dicttoolz.py in valmap(func, d, factory)
     81     """
     82     rv = factory()
---> 83     rv.update(zip(d.keys(), map(func, d.values())))
     84     return rv
     85 

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in serialize(df)
    179     """
    180     col_header, col_bytes = index_to_header_bytes(df.columns)
--> 181     ind_header, ind_bytes = index_to_header_bytes(df.index)
    182     headers = [col_header, ind_header]
    183     bytes = [col_bytes, ind_bytes]

~/conda/envs/rapids-21.12/lib/python3.8/site-packages/partd/pandas.py in index_to_header_bytes(ind)
    111         values = ind.values
    112 
--> 113     header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
    114     bytes = pnp.compress(pnp.serialize(values), values.dtype)
    115     return header, bytes

AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'

It also looks like this has caused issues with pandas dataframe subclasses not being maintained through serialization roundtrips as noted in #52.

Looking through the serialization code itself, it seems like things haven't been modified significantly in a few years, which raises the question of if we can do things in a different (ideally more flexible) way today. For example, we currently only use pickle.dumps for a small subset of pandas Index subclasses:

partd/partd/pandas.py

Lines 98 to 102 in 236a44b

# These have special `__reduce__` methods, just use pickle
if isinstance(ind, (pd.DatetimeIndex,
pd.MultiIndex,
pd.RangeIndex)):
return None, dumps(ind)

When it appears that all Index subclasses should support serialization through pickle. Additionally, it looks like we're opting to manually construct header/bytes from pandas-like objects during serialization when many already implement serialization/deserialization functions that could be used for this purpose.

Essentially, I'm wondering if we could refactor the serialization methods here to:

  • check for serialize/deserialize methods and use them if available
  • if not, fall back on pickle.dumps if __reduce__ is available
  • fall back on manual serialization as a last resort

Some other related goals could be to examine if something similar can be done for the NumPy serialization methods, and potentially adding testing for non-pandas / pandas subclass dataframes.

cc @quasiben @jakirkham

Move default branch from "master" -> "main"

@jrbourbeau and I are in the process of moving the default branch for this repo from master to main.

  • Changed in GitHub
  • Merged PR to change branch name in code (xref #45)

What you'll see

Once the name on github is changed (the first box above is Xed, or this issue closed), when you try to git pull you'll get

Your configuration specifies to merge with the ref 'refs/heads/master'
from the remote, but no such ref was fetched.

What you need to do

First: head to your fork and rename the default branch there
Then:

git branch -m master main
git fetch origin
git branch -u origin/main main

RFE: is it possible to start making github releases?🤔

On create github release entry is created email notification to those whom have set in your repo the web UI Watch->Releases.
gh release can contain additional comments (li changelog) or additional assets like release tar balls (by default it contains only assets from git tag) however all those part are not obligatory.
In simplest variant gh release can be empty because subiekt of the sent email contains git tag name.

I'm asking because my automation process uses those email notifications by trying to make preliminary automated upgrades of building packages, which allows saving some time on maintaining packaging procedures.
Probably other people may be interested to be instantly informed about release new version as well.

Documentation and examples of generate gh releases:
https://docs.github.com/en/repositories/releasing-projects-on-github/managing-releases-in-a-repository
https://cli.github.com/manual/gh_release_upload/
https://github.com/marketplace/actions/github-release
https://pgjones.dev/blog/trusted-plublishing-2023/
jbms/sphinx-immaterial#281 (comment)
tox target to publish on pypi and make gh release https://github.com/jaraco/skeleton/blob/928e9a86d61d3a660948bcba7689f90216cc8243/tox.ini#L42-L58

test_serialization failure on ppcl64el and s390x

Hello,

Rebecca Palmer noticed out that test_serialization in partd version 1.2.0's tests occasionally fail on ppc64el and s390x . ( https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1005045 )

=================================== FAILURES 
===================================
______________________________ test_serialization 
______________________________

     def test_serialization():
         with partd_server(hostname='localhost') as (p, server):
             p.append({'x': b'123'})
             q = pickle.loads(pickle.dumps(p))
 >           assert q.get('x') == b'123'
E           AssertionError: assert b'' == b'123'
E             Full diff:
E             - b'123'
E             + b''

test_zmq.py:114: AssertionError

The full logs are at:
https://ci.debian.net/data/autopkgtest/testing/ppc64el/p/partd/18902384/log.gz
https://ci.debian.net/data/autopkgtest/testing/s390x/p/partd/16889914/log.gz

That code failure really looks like some kind of race condition.

We were wondering what to do about this test failure.

Stick a brief sleep in between the append and the get? block building on these architectures? Just ignore it?

Environment:

  • Python version: 3.9.10
  • Operating System: Debian testing
  • Install method (conda, pip, source): Debian package version 1.2.0-1 CI tests.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.