blaze / castra Goto Github PK
View Code? Open in Web Editor NEWPartitioned storage system based on blosc. **No longer actively maintained.**
License: BSD 3-Clause "New" or "Revised" License
Partitioned storage system based on blosc. **No longer actively maintained.**
License: BSD 3-Clause "New" or "Revised" License
Pickle incomparability between pythons means that a castra made in py3 cannot be read in py2 and vice-versa. Possibly due to string encoding.
In [26]: df = pd.read_csv('small.csv', header=None, names=names, parse_dates=['pickup_datetime', 'dropoff_datetime'])
In [27]: c = Castra(template=df, path='small.castra', categories=[c for c, t in df.dtypes.iteritems() if t == 'object'])
In [28]: c
Out[28]: <castra.core.Castra at 0x112648810>
In [29]: c.dtypes
Out[29]:
medallion object
hack_license object
vendor_id object
rate_code int64
store_and_fwd_flag object
pickup_datetime datetime64[ns]
dropoff_datetime datetime64[ns]
passenger_count int64
trip_time_in_secs int64
trip_distance float64
pickup_longitude float64
pickup_latitude float64
dropoff_longitude float64
dropoff_latitude float64
tolls_amount float64
tip_amount float64
total_amount float64
mta_tax float64
fare_amount float64
payment_type object
surcharge float64
dtype: object
I read your post "Efficient Tabular Storage" and decided to to give Castra a whirl. As my luck would have it, I got an error on the very first data set I tried.
The problem is that the dataframe has a column that is labeled "." (period, without quotes) and, of course, on *nix this is an invalid name for regular files. I don't know if/how you would want to fix this. I've written a test case nontheless.
def test_column_with_period():
df = pd.DataFrame({'x': [10, 20],
'.': [10., 20.]},
columns=['x', '.'],
index=[10, 20])
with Castra(template=df) as c:
c.extend(df)
Which yields:
> with open(filename, 'wb') as fp:
E IOError: [Errno 21] Is a directory: '/tmp/castra-uUz3Gr/10--20/.'
There seems to be a 0.1.8 release (at least there is a tag for it), but no upload to pypy (see https://pypi.python.org/pypi/castra). Is there a reason for this or just forgotten?
dd = df.from_pandas(d, npartitions=2)
dd.dtypes
Out[49]:
a int64
b datetime64[ns]
c object
dtype: object
c = dd.to_castra('delme0.castra')
c = None
c = dd.to_castra('delme1.castra', categories=True)
c
Out[54]: <castra.core.Castra at 0x10983eb00>
ee = df.from_castra('delme1.castra')
ee
Out[56]: dd.DataFrame<from-castra-7c5f3b6d9b74449a9e27408736e8859a, divisions=(0, 4, 9)>
ee.dtypes
Out[57]:
a int64
b datetime64[ns]
c category
dtype: object
c = ee.to_castra('delme2.castra')
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-58-62c09f024c21> in <module>()
----> 1 c = ee.to_castra('delme2.castra')
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/core.py in to_castra(self, fn, categories, sorted_index_column, compute)
1409 from .io import to_castra
1410 return to_castra(self, fn, categories, sorted_index_column,
-> 1411 compute=compute)
1412
1413 def to_bag(self, index=False):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/io.py in to_castra(df, fn, categories, sorted_index_column, compute)
769 keys = [(name, -1), (name, df.npartitions - 1)]
770 if compute:
--> 771 c, _ = DataFrame._get(dsk, keys, get=get_sync)
772 return c
773 else:
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
41 get = get or _globals['get'] or cls._default_get
42 dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43 return get(dsk2, keys, **kwargs)
44
45 @classmethod
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_sync(dsk, keys, **kwargs)
514 queue = Queue()
515 return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 516 raise_on_exception=True, **kwargs)
517
518
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
485 f(key, res, dsk, state, worker_id)
486 while state['ready'] and len(state['running']) < num_workers:
--> 487 fire_task()
488
489 # Final reporting
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in fire_task()
456 # Submit
457 apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458 get_id, raise_on_exception])
459
460 # Seed initial tasks into the thread pool
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in apply_sync(func, args, kwds)
506 def apply_sync(func, args=(), kwds={}):
507 """ A naive synchronous version of apply_async """
--> 508 return func(*args, **kwds)
509
510
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
262 """
263 try:
--> 264 result = _execute_task(task, data)
265 id = get_id()
266 result = key, result, None, id
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
243 elif istask(arg):
244 func, args = arg[0], arg[1:]
--> 245 args2 = [_execute_task(a, cache) for a in args]
246 return func(*args2)
247 elif not ishashable(arg):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in <listcomp>(.0)
243 elif istask(arg):
244 func, args = arg[0], arg[1:]
--> 245 args2 = [_execute_task(a, cache) for a in args]
246 return func(*args2)
247 elif not ishashable(arg):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
244 func, args = arg[0], arg[1:]
245 args2 = [_execute_task(a, cache) for a in args]
--> 246 return func(*args2)
247 elif not ishashable(arg):
248 return arg
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in extend(self, df)
217 # Store columns
218 for col in df.columns:
--> 219 pack_file(df[col].values, self.dirname(partition_name, col))
220
221 # Store index
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in pack_file(x, fn, encoding)
390 if x.dtype != 'O':
391 bloscpack.pack_ndarray_file(x, fn, bloscpack_args=bp_args,
--> 392 blosc_args=blosc_args(x.dtype))
393 else:
394 bytes = blosc.compress(msgpack.packb(x.tolist(), encoding=encoding), 1)
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in blosc_args(dt)
28
29 def blosc_args(dt):
---> 30 if np.issubdtype(dt, int):
31 return bloscpack.BloscArgs(dt.itemsize, clevel=3, shuffle=True)
32 if np.issubdtype(dt, np.datetime64):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
759 else:
760 val = mro[0]
--> 761 return issubclass(dtype(arg1).type, val)
762
763
TypeError: data type not understood
This breaks resample in dask
There is the use case of "please partition my data by hour, regardless of how much data I give to you". is this something that we want to support on top of castra?
Hi, apologies if this isn't the right place for this!
I'm trying to create a Castra with:
import dask.dataframe as dd
df = dd.read_csv('/home/jacob/av_files/*.csv', names = ['property_id', 'date', 'available', 'minimum_stay', 'price', 'destination', 'primary_geo_unit', 'capacity', 'tp_rev_ct', 'snapshot_date'])
df.set_index('snapshot_date', compute=False).to_castra('av_test.castra', categories = T)
But I am seeing this error, my data has plenty of non english characters.
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8e in position 424: invalid start byte
Thanks Jacob
castra.to_dask('col').compute()
is slower than castra[:, 'col']
, and user significantly more RAM. For my sample dataset, loading ~400 MB of text used a peak of 2 GB when loaded straight from castra, and a peak of 12 GB when loaded through dask. This was not seen when using `.compute(get=get_sync).
I was unable to determine if this was specific to object dtype, as the numeric data loaded in ms (compared to several seconds for strings). An intermediate solution (as discussed with @mrocklin) might be to put a lock around the object branch in unpack_file
, thus serializing these requests.
Currently there is no mechanism to extract only the index and not any columns. This ends up being important for dask.dataframe if you want to operate with the index
dd.from_castra(...).index.compute()
Dask.dataframe also lacks a suitable optimization to make this fast. See conversation at https://gitter.im/blaze/dask?at=55f6fafd18e0111d7e4ec65a
In [28]: import castra
In [29]: castra.__version__
Out[29]: '0.1.1'
In [30]: from castra import Castra
In [31]: df = pd.DataFrame(dict(a=list('aabbcc'), b=range(6))).set_index('b')
In [32]: df['a'] = df.a.astype('category')
In [33]: c = Castra(template=df, path='foo.castra', categories=['a'])
In [34]: c.extend(df)
In [35]: c[:].index.name == 'b'
Out[35]: False
Apparently people often want to add new columns that are results from the old columns.
Do we want to support adding another column onto an existing castra? We would assume that the new column has the exact same partition structure and number of elements per partition.
Say you had one giant CSV that contained 5 years of data and wanted castra to repartition it weekly. Until issue #3 gets resolved, as far as I understand the only option is extend_sequence([df], freq='M')
This does not work, because partitioner
is never called on buf
.
There is a workaround (pass [df, pd.DataFrame()]
), but ideally extend_sequence
should work.
Any thoughts on adding a deprecation message or generally a louder warning in the README?
We usually don't need all columns at once. We store them separately but haven't yet provide separate access (though I suspect that this is only a few lines of code)
How do we want to spell these queries? The typical pandas/bcolz solution seems odd.
>>> c[['col1', 'col2', 'col3']]['2014-01': '2014-02']
Thoughts on throwing out getitem syntax for a method?
>>> c.select(start='2014-01', stop='2014-02', columns=['col1', 'col2'])
Or we could merge both getitems into one
>>> c['2014-01': '2014-02', ['col1', 'col2', 'col2'])
For many dask operations it very useful to have the index in each partition sorted. In certain cases, particularly time indexed frames, operations like resample break without a sorted index in each partition. I think we should do this either by default or have an option to do this
dear developers:
the example in README shows a DataFrame
with datetimeindex
. what if I need to keep daily prices of all stocks? can we add an integer representing each stock, and form a multi-level index like in pandas?
sincerely
The __del__
method causes a race condition between processes/threads reading from the same castra if it's initialized multiple times (different Castra
objects, same filepath). Other workers can read from the file mid flush
, which causes periodic failures (seen both locally, and on travis here).
A few solutions:
readonly
mode that doesn't do anything on __del__
, and doesn't allow extend
/extend_sequence
. Could either be a mode='r'
kwarg, or a readonly=True
kwarg. Default would be writable, so no change in default behavior.flush
as a default is worth it, but I'm definitely against automatic delete. IMO temporary files/directories should be managed externally to castra. This is a separate issue entirely though.My preference is option 1.
pretty much right out of here: https://github.com/dask/dask-tutorial/blob/master/03b-DataFrame-Storage.ipynb
note I am using python 3.5
In [18]: dask.__version__
Out[18]: '0.8.0'
In [19]: pd.__version__
Out[19]: u'0.18.0rc1'
In [22]: castra.__version__
Out[22]: '0.1.6'
In [6]: from prep import accounts_csvs
In [7]: accounts_csvs(3, 1000000, 500)
In [8]: import dask.dataframe as dd
In [10]: import os
In [11]: filename = os.path.join('data', 'accounts.*.csv')
In [12]: filename
Out[12]: 'data/accounts.*.csv'
In [13]: df = dd.read_csv(filename)
In [14]: df.head()
Out[14]:
id names amount
0 171 Laura 533
1 69 Alice 112
2 130 Sarah 259
3 313 George -56
4 202 Ray 2205
In [15]: c = df.to_castra('accounts.castra', categories=['names'])
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-15-4e0e632fd837> in <module>()
----> 1 c = df.to_castra('accounts.castra', categories=['names'])
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_castra(self, fn, categories, sorted_index_column, compute)
1440 from .io import to_castra
1441 return to_castra(self, fn, categories, sorted_index_column,
-> 1442 compute=compute)
1443
1444 def to_bag(self, index=False):
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/dataframe/io.pyc in to_castra(df, fn, categories, sorted_index_column, compute)
798 keys = [(name, -1), (name, df.npartitions - 1)]
799 if compute:
--> 800 c, _ = DataFrame._get(dsk, keys, get=get_sync)
801 return c
802 else:
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/base.pyc in _get(cls, dsk, keys, get, **kwargs)
41 get = get or _globals['get'] or cls._default_get
42 dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43 return get(dsk2, keys, **kwargs)
44
45 @classmethod
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in get_sync(dsk, keys, **kwargs)
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in fire_task()
456 # Submit
457 apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458 get_id, raise_on_exception])
459
460 # Seed initial tasks into the thread pool
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in apply_sync(func, args, kwds)
506 def apply_sync(func, args=(), kwds={}):
507 """ A naive synchronous version of apply_async """
--> 508 return func(*args, **kwds)
509
510
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in execute_task(key, task, data, queue, get_id, raise_on_exception)
262 """
263 try:
--> 264 result = _execute_task(task, data)
265 id = get_id()
266 result = key, result, None, id
/Users/jreback/miniconda/lib/python2.7/site-packages/dask/async.pyc in _execute_task(arg, cache, dsk)
244 func, args = arg[0], arg[1:]
245 args2 = [_execute_task(a, cache) for a in args]
--> 246 return func(*args2)
247 elif not ishashable(arg):
248 return arg
/Users/jreback/miniconda/lib/python2.7/site-packages/castra/core.pyc in __init__(self, path, template, categories, readonly)
118
119 self.partitions = pd.Series([], dtype='O',
--> 120 index=template2.index.__class__([]))
121 self.minimum = None
122
/Users/jreback/miniconda/lib/python2.7/site-packages/pandas/indexes/range.py in __new__(cls, start, stop, step, name, dtype, fastpath, copy, **kwargs)
70 start = 0
71 else:
---> 72 start = _ensure_int(start, 'start')
73 if stop is None:
74 stop = start
/Users/jreback/miniconda/lib/python2.7/site-packages/pandas/indexes/range.py in _ensure_int(value, field)
56 def _ensure_int(value, field):
57 try:
---> 58 new_value = int(value)
59 assert(new_value == value)
60 except (ValueError, AssertionError):
TypeError: int() argument must be a string or a number, not 'list'
Similar to R's factor
s, Pandas has an ordered
argument that indicates that the categories have an ordering. I'm not sure what happens when you have an ordered Categorical
.
When calling extend
, we only add the length of the incoming chunk to the first partition which means pandas chunk reader doesn't just work with trivial indices
I wonder if this should be an option in core pandas.
cc @jreback
I've been working on a refactor of Castra - before I spend any more time on this, I should probably get some feedback. Here's the plan:
Issues I'm attempting to solve:
[[1, 2, 3, 3], [3, 3, 4, 5, 6], ...]
were possible (and happened to me)The plan:
partitionby=None
to the init
signature. This will live in meta
. If None
, no repartitioning is done by Castra. Can also be a time period (things you can pass to resample
in pandas).extend
checks current partitions for equality overlap (even if partitionby=None
). There are 3 cases that can happen here:
partitionby != None
, then data is partitioned by Castra into blocks. extend
should still take large dataframes (calling extend on a row is a bad idea), but will group them into partitions based on the rule passed to partitionby
. Using the functionality provided by bloscpack, the on disk partitions can be appended to with little overhead. This makes writing in cases where this happens slightly slower, but has no penalty on reads.extend_sequence
function. This takes an iterable of dataframes (can be a generator), and does the partitioning in memory instead of on disk. This will be faster than calling extend
in a loop (no on disk appends), but will result in the same disk file format.This method means that the disk will match what's in memory after calls to extend
or extend_sequence
complete, will allow castra to do partitioning for the user, and will ensure that the partitions are valid. I have a crude version of this working now, and have found writes to be only slightly penalized when appends happen (no penalty if they don't), and no penalty for reading from disk.
I'd like to rig up travis support, some questions:
I am trying to replicate the example of this page: http://blaze.pydata.org/blog/2015/09/08/reddit-comments/ about castra, dask and reddit comments, and I get this error when I run the dd.from_castra(data,columns). My castra file took some hours to be created but it is clean and exactly as the tutorial mentions.
Can you please check?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.