Code Monkey home page Code Monkey logo

castra's Issues

categories are not correctly inferred when passed in to Castra constructor

In [26]: df = pd.read_csv('small.csv', header=None, names=names, parse_dates=['pickup_datetime', 'dropoff_datetime'])

In [27]: c = Castra(template=df, path='small.castra', categories=[c for c, t in df.dtypes.iteritems() if t == 'object'])

In [28]: c
Out[28]: <castra.core.Castra at 0x112648810>

In [29]: c.dtypes
Out[29]:
medallion                     object
hack_license                  object
vendor_id                     object
rate_code                      int64
store_and_fwd_flag            object
pickup_datetime       datetime64[ns]
dropoff_datetime      datetime64[ns]
passenger_count                int64
trip_time_in_secs              int64
trip_distance                float64
pickup_longitude             float64
pickup_latitude              float64
dropoff_longitude            float64
dropoff_latitude             float64
tolls_amount                 float64
tip_amount                   float64
total_amount                 float64
mta_tax                      float64
fare_amount                  float64
payment_type                  object
surcharge                    float64
dtype: object

"." columns

I read your post "Efficient Tabular Storage" and decided to to give Castra a whirl. As my luck would have it, I got an error on the very first data set I tried.

The problem is that the dataframe has a column that is labeled "." (period, without quotes) and, of course, on *nix this is an invalid name for regular files. I don't know if/how you would want to fix this. I've written a test case nontheless.

def test_column_with_period():                                                                      
    df = pd.DataFrame({'x': [10, 20],                                                               
                       '.': [10., 20.]},                                                            
                      columns=['x', '.'],                                                           
                      index=[10, 20])                                                               

    with Castra(template=df) as c:                                                                  
        c.extend(df) 

Which yields:

>       with open(filename, 'wb') as fp:
E       IOError: [Errno 21] Is a directory: '/tmp/castra-uUz3Gr/10--20/.'

castra 'TypeError: data type not understood' when storing categoricals

dd = df.from_pandas(d, npartitions=2)

dd.dtypes
Out[49]: 
a             int64
b    datetime64[ns]
c            object
dtype: object

c = dd.to_castra('delme0.castra')

c = None

c = dd.to_castra('delme1.castra', categories=True)

c
Out[54]: <castra.core.Castra at 0x10983eb00>

ee = df.from_castra('delme1.castra')

ee
Out[56]: dd.DataFrame<from-castra-7c5f3b6d9b74449a9e27408736e8859a, divisions=(0, 4, 9)>

ee.dtypes
Out[57]: 
a             int64
b    datetime64[ns]
c          category
dtype: object

c = ee.to_castra('delme2.castra')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-58-62c09f024c21> in <module>()
----> 1 c = ee.to_castra('delme2.castra')

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/core.py in to_castra(self, fn, categories, sorted_index_column, compute)
   1409         from .io import to_castra
   1410         return to_castra(self, fn, categories, sorted_index_column,
-> 1411                          compute=compute)
   1412 
   1413     def to_bag(self, index=False):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/io.py in to_castra(df, fn, categories, sorted_index_column, compute)
    769     keys = [(name, -1), (name, df.npartitions - 1)]
    770     if compute:
--> 771         c, _ = DataFrame._get(dsk, keys, get=get_sync)
    772         return c
    773     else:

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
     41         get = get or _globals['get'] or cls._default_get
     42         dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43         return get(dsk2, keys, **kwargs)
     44 
     45     @classmethod

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_sync(dsk, keys, **kwargs)
    514     queue = Queue()
    515     return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 516                      raise_on_exception=True, **kwargs)
    517 
    518 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    485             f(key, res, dsk, state, worker_id)
    486         while state['ready'] and len(state['running']) < num_workers:
--> 487             fire_task()
    488 
    489     # Final reporting

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in fire_task()
    456         # Submit
    457         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458                                         get_id, raise_on_exception])
    459 
    460     # Seed initial tasks into the thread pool

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in apply_sync(func, args, kwds)
    506 def apply_sync(func, args=(), kwds={}):
    507     """ A naive synchronous version of apply_async """
--> 508     return func(*args, **kwds)
    509 
    510 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
    262     """
    263     try:
--> 264         result = _execute_task(task, data)
    265         id = get_id()
    266         result = key, result, None, id

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in <listcomp>(.0)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    244         func, args = arg[0], arg[1:]
    245         args2 = [_execute_task(a, cache) for a in args]
--> 246         return func(*args2)
    247     elif not ishashable(arg):
    248         return arg

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in extend(self, df)
    217         # Store columns
    218         for col in df.columns:
--> 219             pack_file(df[col].values, self.dirname(partition_name, col))
    220 
    221         # Store index

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in pack_file(x, fn, encoding)
    390     if x.dtype != 'O':
    391         bloscpack.pack_ndarray_file(x, fn, bloscpack_args=bp_args,
--> 392                 blosc_args=blosc_args(x.dtype))
    393     else:
    394         bytes = blosc.compress(msgpack.packb(x.tolist(), encoding=encoding), 1)

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in blosc_args(dt)
     28 
     29 def blosc_args(dt):
---> 30     if np.issubdtype(dt, int):
     31         return bloscpack.BloscArgs(dt.itemsize, clevel=3, shuffle=True)
     32     if np.issubdtype(dt, np.datetime64):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
    759     else:
    760         val = mro[0]
--> 761     return issubclass(dtype(arg1).type, val)
    762 
    763 

TypeError: data type not understood

utf-8 encoding problem(?)

Hi, apologies if this isn't the right place for this!

I'm trying to create a Castra with:

import dask.dataframe as dd
df = dd.read_csv('/home/jacob/av_files/*.csv', names = ['property_id', 'date', 'available', 'minimum_stay', 'price', 'destination', 'primary_geo_unit', 'capacity', 'tp_rev_ct', 'snapshot_date'])
df.set_index('snapshot_date', compute=False).to_castra('av_test.castra', categories = T)

But I am seeing this error, my data has plenty of non english characters.

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8e in position 424: invalid start byte

Thanks Jacob

Loading a column in castra slower in parallel

castra.to_dask('col').compute() is slower than castra[:, 'col'], and user significantly more RAM. For my sample dataset, loading ~400 MB of text used a peak of 2 GB when loaded straight from castra, and a peak of 12 GB when loaded through dask. This was not seen when using `.compute(get=get_sync).

I was unable to determine if this was specific to object dtype, as the numeric data loaded in ms (compared to several seconds for strings). An intermediate solution (as discussed with @mrocklin) might be to put a lock around the object branch in unpack_file, thus serializing these requests.

castra doesn't store the index name

In [28]: import castra

In [29]: castra.__version__
Out[29]: '0.1.1'

In [30]: from castra import Castra

In [31]: df = pd.DataFrame(dict(a=list('aabbcc'), b=range(6))).set_index('b')

In [32]: df['a'] = df.a.astype('category')

In [33]: c = Castra(template=df, path='foo.castra', categories=['a'])

In [34]: c.extend(df)

In [35]: c[:].index.name == 'b'
Out[35]: False

Add new columns?

Apparently people often want to add new columns that are results from the old columns.

Do we want to support adding another column onto an existing castra? We would assume that the new column has the exact same partition structure and number of elements per partition.

Partitioning in extend_sequence does not work if len(seq) == 1

Say you had one giant CSV that contained 5 years of data and wanted castra to repartition it weekly. Until issue #3 gets resolved, as far as I understand the only option is extend_sequence([df], freq='M')

This does not work, because partitioner is never called on buf.

There is a workaround (pass [df, pd.DataFrame()]), but ideally extend_sequence should work.

Deprecate?

Any thoughts on adding a deprecation message or generally a louder warning in the README?

Column access

We usually don't need all columns at once. We store them separately but haven't yet provide separate access (though I suspect that this is only a few lines of code)

How do we want to spell these queries? The typical pandas/bcolz solution seems odd.

>>> c[['col1', 'col2', 'col3']]['2014-01': '2014-02']

Thoughts on throwing out getitem syntax for a method?

>>> c.select(start='2014-01', stop='2014-02', columns=['col1', 'col2'])

Or we could merge both getitems into one

>>> c['2014-01': '2014-02', ['col1', 'col2', 'col2'])

consider sorting the index in extend

For many dask operations it very useful to have the index in each partition sorted. In certain cases, particularly time indexed frames, operations like resample break without a sorted index in each partition. I think we should do this either by default or have an option to do this

question: does castra support multi-level index?

dear developers:
the example in README shows a DataFrame with datetimeindex. what if I need to keep daily prices of all stocks? can we add an integer representing each stock, and form a multi-level index like in pandas?
sincerely

__del__ method causes race condition between processes/threads

The __del__ method causes a race condition between processes/threads reading from the same castra if it's initialized multiple times (different Castra objects, same filepath). Other workers can read from the file mid flush, which causes periodic failures (seen both locally, and on travis here).

A few solutions:

  1. Add a readonly mode that doesn't do anything on __del__, and doesn't allow extend/extend_sequence. Could either be a mode='r' kwarg, or a readonly=True kwarg. Default would be writable, so no change in default behavior.
  2. Remove the automatic flush/delete behavior. I'm unsure if removing automatic flush as a default is worth it, but I'm definitely against automatic delete. IMO temporary files/directories should be managed externally to castra. This is a separate issue entirely though.
  3. File based locks to ensure only one writer to each file/no reading while writing to same file.

My preference is option 1.

df.to_castra breaking with pandas v0.18.0rc1

pretty much right out of here: https://github.com/dask/dask-tutorial/blob/master/03b-DataFrame-Storage.ipynb

note I am using python 3.5

In [18]: dask.__version__
Out[18]: '0.8.0'

In [19]: pd.__version__
Out[19]: u'0.18.0rc1'

In [22]: castra.__version__
Out[22]: '0.1.6'
In [6]: from prep import accounts_csvs

In [7]: accounts_csvs(3, 1000000, 500)

In [8]: import dask.dataframe as dd

In [10]: import os

In [11]: filename = os.path.join('data', 'accounts.*.csv')

In [12]: filename
Out[12]: 'data/accounts.*.csv'

In [13]: df = dd.read_csv(filename)

In [14]: df.head()
Out[14]: 
    id   names  amount
0  171   Laura     533
1   69   Alice     112
2  130   Sarah     259
3  313  George     -56
4  202     Ray    2205

In [15]: c = df.to_castra('accounts.castra', categories=['names'])
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-15-4e0e632fd837> in <module>()
----> 1 c = df.to_castra('accounts.castra', categories=['names'])

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_castra(self, fn, categories, sorted_index_column, compute)
   1440         from .io import to_castra
   1441         return to_castra(self, fn, categories, sorted_index_column,
-> 1442                          compute=compute)
   1443 
   1444     def to_bag(self, index=False):

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/dataframe/io.pyc in to_castra(df, fn, categories, sorted_index_column, compute)
    798     keys = [(name, -1), (name, df.npartitions - 1)]
    799     if compute:
--> 800         c, _ = DataFrame._get(dsk, keys, get=get_sync)
    801         return c
    802     else:

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/base.pyc in _get(cls, dsk, keys, get, **kwargs)
     41         get = get or _globals['get'] or cls._default_get
     42         dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43         return get(dsk2, keys, **kwargs)
     44 
     45     @classmethod

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in get_sync(dsk, keys, **kwargs)
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in fire_task()
    456         # Submit
    457         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458                                         get_id, raise_on_exception])
    459 
    460     # Seed initial tasks into the thread pool

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in apply_sync(func, args, kwds)
    506 def apply_sync(func, args=(), kwds={}):
    507     """ A naive synchronous version of apply_async """
--> 508     return func(*args, **kwds)
    509 
    510 

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in execute_task(key, task, data, queue, get_id, raise_on_exception)
    262     """
    263     try:
--> 264         result = _execute_task(task, data)
    265         id = get_id()
    266         result = key, result, None, id

/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in _execute_task(arg, cache, dsk)
    244         func, args = arg[0], arg[1:]
    245         args2 = [_execute_task(a, cache) for a in args]
--> 246         return func(*args2)
    247     elif not ishashable(arg):
    248         return arg

/Users/jreback/miniconda/lib/python2.7/site-packages/castra/core.pyc in __init__(self, path, template, categories, readonly)
    118 
    119             self.partitions = pd.Series([], dtype='O',
--> 120                                         index=template2.index.__class__([]))
    121             self.minimum = None
    122 

/Users/jreback/miniconda/lib/python2.7/site-packages/pandas/indexes/range.py in __new__(cls, start, stop, step, name, dtype, fastpath, copy, **kwargs)
     70             start = 0
     71         else:
---> 72             start = _ensure_int(start, 'start')
     73         if stop is None:
     74             stop = start

/Users/jreback/miniconda/lib/python2.7/site-packages/pandas/indexes/range.py in _ensure_int(value, field)
     56         def _ensure_int(value, field):
     57             try:
---> 58                 new_value = int(value)
     59                 assert(new_value == value)
     60             except (ValueError, AssertionError):

TypeError: int() argument must be a string or a number, not 'list'

how should we handle ordered categoricals?

Similar to R's factors, Pandas has an ordered argument that indicates that the categories have an ordering. I'm not sure what happens when you have an ordered Categorical.

Support for on-disk appends, partitioning

I've been working on a refactor of Castra - before I spend any more time on this, I should probably get some feedback. Here's the plan:

Issues I'm attempting to solve:

  1. Castra provides no validation that partitions split evenly - indices like [[1, 2, 3, 3], [3, 3, 4, 5, 6], ...] were possible (and happened to me)
  2. Castra provides no easy way to say "partition weekly", without manually doing the partitioning elsewhere (issue #3)

The plan:

  1. Add partitionby=None to the init signature. This will live in meta. If None, no repartitioning is done by Castra. Can also be a time period (things you can pass to resample in pandas).
  2. extend checks current partitions for equality overlap (even if partitionby=None). There are 3 cases that can happen here:
    1. Start of new frame is before end of existing partition. This errors
    2. Start of new frame is equal to end of existing partition. The equal parts are split off and appended to existing partition. Remainder is stored as new partition.
    3. Start of new frame is after existing partition. New frame is written to disk (current behavior)
  3. If partitionby != None, then data is partitioned by Castra into blocks. extend should still take large dataframes (calling extend on a row is a bad idea), but will group them into partitions based on the rule passed to partitionby. Using the functionality provided by bloscpack, the on disk partitions can be appended to with little overhead. This makes writing in cases where this happens slightly slower, but has no penalty on reads.
  4. Add extend_sequence function. This takes an iterable of dataframes (can be a generator), and does the partitioning in memory instead of on disk. This will be faster than calling extend in a loop (no on disk appends), but will result in the same disk file format.

This method means that the disk will match what's in memory after calls to extend or extend_sequence complete, will allow castra to do partitioning for the user, and will ensure that the partitions are valid. I have a crude version of this working now, and have found writes to be only slightly penalized when appends happen (no penalty if they don't), and no penalty for reading from disk.

travis support

I'd like to rig up travis support, some questions:

  • Which Python versions? 2.7 and 3.4?
  • What is the recommended way to run tests? pytest?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.