frictionlessdata / datapackage-pipelines Goto Github PK
View Code? Open in Web Editor NEWFramework for processing data packages in pipelines of modular components.
Home Page: https://frictionlessdata.io/
License: MIT License
Framework for processing data packages in pipelines of modular components.
Home Page: https://frictionlessdata.io/
License: MIT License
Consider a pipeline that ends with 2 processors:
dump.to_zip
writes a datapackage as a zip in the local filesystem, which upload_zip
loads and uploads to some service. This means that they have a dependency: upload_zip
can't run before dump.to_zip
is finished.
The design of datapackage-pipelines is to work with streaming data. Every processor runs in parallel, taking the data rows as they come. The current pattern that we use to make sure the previous processors have finished running is to call spew()
to pass the data on to the next processors (modified or not), and then run our code. This works fine if there's only one processor that uses this pattern, but not if there's more than one. Consider this simplified code for dump.to_zip
and upload_zip
.
dump.to_zip
zip_file = zipfile.ZipFile(out_file)
spew(
datapackage,
resource_iter # This iterator will write the files to `zip_file`
)
# After spew() has finished, it'll unblock the following processor's spew() calls
# so the code after their spew() calls will start running.
zip_file.close()
upload_zip
# Block until the previous processor's spew() calls have finished
spew(datapackage, resource_iter)
# Load zip from path and upload it
upload_zip(params['in_file'])
This doesn't guarantee that dump.to_zip
will call zip_file.close()
before upload_zip
read the zip file, so it might be reading an invalid zip from disk (because dump.to_zip
is still writing the contents). We have a race condition.
This race condition happens because the processors are communicating outside datapackage-pipelines
, using the filesystem. As dpp
doesn't know about this, it has no way to coordinate their communication. To avoid this, we must get the code in dpp
.
My proposal is to add a finalizer
parameter to spew()
where you can send finalization code to be executed after spew()
has done processing the incoming data, but before it has signaled to the next processors in the pipeline that we're finished. This way, the code for (for example) dump.to_zip
would become:
zip_file = zipfile.ZipFile(out_file)
spew(
datapackage,
resource_iter,
finalizer=lambda: zip_file.close()
)
I have implemented this pattern on openspending/datapackage-pipelines-fiscal@3910481, although as that's outside dpp
, I had to do it in a hackish way.
Python 3.6.1 MacOS
I'm writing a pipeline with a very simple processor. If there's a print()
statement in there, running the pipeline causes this crash:
INFO :Main :Skipping redis connection, host:None, port:6379
WARNING :Main :No csv processor available for my-first-item
INFO :Main :RUNNING ./my-first-item
INFO :Main :- add_metadata
INFO :Main :- spss.add_spss
INFO :Main :- (sink)
INFO :Main :DONE /Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/specs/../lib/add_metadata.py
INFO :Main :(sink): Traceback (most recent call last):
INFO :Main :(sink): File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py", line 5, in <module>
INFO :Main :(sink): params, dp, res_iter = ingest()
INFO :Main :(sink): File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/wrapper.py", line 34, in ingest
INFO :Main :(sink): datapackage, resource_iterator = process_input(sys.stdin, validate, debug)
INFO :Main :(sink): File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/input_processor.py", line 63, in process_input
INFO :Main :(sink): dp = json.loads(dp_json)
INFO :Main :(sink): File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/utilities/extended_json.py", line 123, in _loads
INFO :Main :(sink): return _json.loads(*args, **kwargs)
INFO :Main :(sink): File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 367, in loads
INFO :Main :(sink): return cls(**kw).decode(s)
INFO :Main :(sink): File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
INFO :Main :(sink): obj, end = self.raw_decode(s, idx=_w(s, 0).end())
INFO :Main :(sink): File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
INFO :Main :(sink): raise JSONDecodeError("Expecting value", s, err.value) from None
INFO :Main :(sink): json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
INFO :Main :DONE /Users/brew/virtualenvs/ukds_pilot/src/datapackage-pipelines-spss/datapackage_pipelines_spss/processors/add_spss.py
ERROR :Main :FAILED /Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py: 1
WARNING :Main :No csv processor available for my-first-item
WARNING :Main :No csv processor available for my-first-item
INFO :Main :RESULTS:
INFO :Main :FAILURE: ./my-first-item
ERROR log from processor (sink):
+--------
| Traceback (most recent call last):
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py", line 5, in <module>
| params, dp, res_iter = ingest()
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/wrapper.py", line 34, in ingest
| datapackage, resource_iterator = process_input(sys.stdin, validate, debug)
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/input_processor.py", line 63, in process_input
| dp = json.loads(dp_json)
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/utilities/extended_json.py", line 123, in _loads
| return _json.loads(*args, **kwargs)
| File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 367, in loads
| return cls(**kw).decode(s)
| File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
| obj, end = self.raw_decode(s, idx=_w(s, 0).end())
| File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
| raise JSONDecodeError("Expecting value", s, err.value) from None
| json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Using the logging module to stout works fine. However, I'm importing third-party modules into the processor that use print
so can't easily avoid it.
date
string
this is the problematic line, it's removed in the master
- just need to publish a new version
https://github.com/frictionlessdata/datapackage-pipelines/blob/1.5.0/datapackage_pipelines/web/server.py#L124
Python 3.6 @ Ubuntu 16.04
Consider the pipeline:
test:
pipeline:
-
run: inexistent_task
When running it via dpp run
:
$ dpp run ./test
INFO :Main :Skipping redis connection, host:None, port:6379
INFO :Main :RESULTS:
The same result happens if we try running an unknown pipeline, like:
$ dpp run ./this_doesnt_exist
INFO :Main :Skipping redis connection, host:None, port:6379
INFO :Main :RESULTS:
I would expect it to fail, ideally with a helpful error message (like we show when simply running dpp
).
The datapackage-pipelines-metrics provides most of the required features, but it has 2 major problems:
AFAIK the current plugin framework doesn't support this use-case - which might be a good thing (I wouldn't like to see general hooks / events which allow to modify how the system works)
but - I think that for metrics, it is needed
perhaps a generic pluggable metrics architecture is in order?
Hey Adam!
Is Windows supported for the datapackage-pipelines package?
I got it to work on bash on windows, but windows native I get the following error:
C:\dpp\wb_test>dpp run all
INFO :Main :Skipping redis connection, host:None, port:6379
DEBUG :Main :Using selector: SelectSelector
INFO :Main :RUNNING .\worldbank-co2-emissions
INFO :Main :- add_metadata
Traceback (most recent call last):
File "C:\Users\Jasper Heeffer\AppData\Local\Programs\Python\Python36-32\Scripts\dpp-script.py", line 11, in <module>
load_entry_point('datapackage-pipelines==1.0.14', 'console_scripts', 'dpp')()
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 722, in __call__
return self.main(*args, **kwargs)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 697, in main
rv = self.invoke(ctx)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 1066, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 895, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 535, in invoke
return callback(*args, **kwargs)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\cli.py", line 68, in run
execute_if_needed(pipeline_id, spec, use_cache)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\cli.py", line 40, in execute_if_needed
use_cache=use_cache)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 254, in execute_pipeline
return loop.run_until_complete(pipeline_task)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 466, in run_until_complete
return future.result()
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 187, in async_execute_pipeline
await construct_process_pipeline(pipeline_steps, pipeline_cwd, errors)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 135, in construct_process_pipeline
process = await create_process(args, pipeline_cwd, wfd, rfd)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\subprocess.py", line 225, in create_subprocess_exec
stderr=stderr, **kwds)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 1190, in subprocess_exec
bufsize, **kwargs)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\coroutines.py", line 210, in coro
res = func(*args, **kw)
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 340, in _make_subprocess_transport
raise NotImplementedError
NotImplementedError
ERROR :Main :Task was destroyed but it is pending!
task: <Task pending coro=<dequeue_errors() running at c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x04766A50>()]>>
Exception ignored in: <coroutine object dequeue_errors at 0x04767B40>
Traceback (most recent call last):
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 29, in dequeue_errors
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\queues.py", line 169, in get
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 573, in call_soon
File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
Here are some thoughts from newbie:
dpp init
would guide me through pipeline creationdpp --help
didn't show how do I specify pipelinedpp serve
and going to http://localhost:5000/ resulted in builtins.KeyError KeyError: b'all-pipelines'
which is not a user-friendly way to show that I don't have any pipelines.Suggestions:
dpp init
) wizard would be awesomedpp run ./pipeline.yaml
, dpp serve ./pipeline.yaml
dpp serve
should validate whether there are any pipelines available and show nice user-friendly error with suggestions to fixpython 3.5
alpine 3.4
I'm trying to construct an example for processing zipped xls.
Downloader seems to only support csv.
A couple of questions:
/tmp
? How can I make sure it's deleted after I'm done? Even if some error occurs. Is there some kind of with
statement across the pipeline?As a dpp user, I want to be able to pre-define unnecessary columns for my data, so that after running dpp run xxx
output data does not have those columns
dpp
remove_fields
processor to common processorsSpec:
run: delete_fields
parameters:
resources: resource_name
columns:
- column_name1
- column_name2
When processing large datasets with multiple resources, using datapackage-pipelines
would be easier if it provided more information about the particular resource in certain types of errors. For instance, when failing to set a type in the set_types
processor, it be helpful if
"No field found matching %r" % field_name
were more like:
"No field found matching %r in resource %s" % (field_name, name)
where name
was the resource name.
Likewise, when reading from an SQL database table:
INFO :Main :stream_remote_resources: INFO :enliten: OPENING mysql://root@localhost/enliten
Would be more helpful if it referenced the table name being pulled.
For example, see https://github.com/frictionlessdata/datapackage-pipelines/blob/054db2faa0e768104ac43fd070cf3cc2e17ef69f/README.md#add_resource. The YAML example is:
- run: add_resource
parameters:
url: http://example.com/my-excel-file.xlsx
sheet: 1
headers: 2
- run: add_resource
parameters:
url: http://example.com/my-csv-file.csv
encoding: "iso-8859-2"
Notice the red highlight before encoding
. That's a TAB. I'm not sure which level the encoding
should be in, so I couldn't send a PR, but there are a few examples like this in the README.
In any case, when using the set_types standard processor, it will validate and transform the input data with the new types
When setting types, if the data does not conform to the constraints
or type
set in the schema, it will fail:
INFO :Main :dump.to_path: jsontableschema.exceptions.ConstraintError: The field 'Aggregate' must not be more than 10
INFO :Main :dump.to_path: jsontableschema.exceptions.InvalidDateType: time data '523' does not match format '%Y-%m-%d'
It would be helpful if the processor provided, say, the row/resource that failed the check. Something equivalent to the Good Tables output. Or, provide the option not to do a data validation on set_types
so that I can manually run goodtables
afterwards.
If datapackage descriptor takes more than 64K in json form the pipeline will fail with the message 'Stats object too large!'
I need this feature for the ESIF pipeline. I thought it might be a good idea to implement it directly in the framework.
[ ] - Add optional data validation and reporting support to the function ingest
[ ] - Add a toggle for data validation and reporting on pipeline processors.
@akariv Can you give me feedback on this please?
Currently, plugins can optionally provide one Generator class. And one my_plugin.source-spec.yaml
filetype per generator. This means each *.source-spec.yaml
filetype requires its own plugin, and generators in separate plugins can't share common processors.
I propose letting plugins provide their own custom spec parsers that extend parsers.base_parser.BaseParser
. This would allow plugins to resolve source-specs and generators in their own way, potentially allowing plugins to provide more than one generator type, subsequently allowing more than one *.source-spec.yaml
filetype per plugin.
For example, the datapackage_pipelines_measure
plugin could have a social-media
generator, a website-analytics
generator, a code-packaging
generator, etc. And each project directory could contain the corresponding social-media.measure.source-spec.yaml
, website-analytics.measure.source-spec.yaml
, and code-packaging.measure.source-spec.yaml
files.
A proposed parser discovery solution:
specs.find_specs()
looks for more parsers (subclasses of BaseParser
) in the parsers
directory of the pluginspecs.SPEC_PARSERS
(so they take precedence over native parsers).specs.find_specs()
carries on as normalWhat do you think, @akariv?
enliten:
pipeline:
- run: add_metadata
parameters:
name: 'enliten'
title: 'ENLITEN'
homepage: 'http://www.cs.bath.ac.uk/enliten/'
temporal:
name: 'Data Collected'
start: '2013-10-01'
end: '2015-06-30'
- parameters:
name: humidity
table: humidity
url: mysql://root@localhost/enliten
run: add_resource
- run: stream_remote_resources
- run: set_types
parameters:
resources: humidity
types:
'_id':
type: integer
device:
type: integer
sensor:
type: integer
humidity:
type: number
timestamp:
type: datetime
- run: dump.to_path
parameters:
out-path: ../_datasets/enliten
produces
(pilot-dm4t) legend:pilot-dm4t dan (convert-to-datapackage-pipelines)$ dpp run ./enliten/enliten
INFO :Main :Skipping redis connection, host:None, port:6379
INFO :Main :RUNNING ./enliten/enliten
INFO :Main :- add_metadata
INFO :Main :- add_resource
INFO :Main :- stream_remote_resources
INFO :Main :- set_types
INFO :Main :- dump.to_path
INFO :Main :- (sink)
INFO :Main :DONE /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/add_metadata.py
INFO :Main :DONE /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/add_resource.py
INFO :Main :stream_remote_resources: INFO :enliten: OPENING mysql://root@localhost/enliten
INFO :Main :stream_remote_resources: ERROR :Exception during reset or similar
INFO :Main :stream_remote_resources: Traceback (most recent call last):
INFO :Main :stream_remote_resources: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/pool.py", line 687, in _finalize_fairy
INFO :Main :stream_remote_resources: fairy._reset(pool)
INFO :Main :stream_remote_resources: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/pool.py", line 829, in _reset
INFO :Main :stream_remote_resources: pool._dialect.do_rollback(self)
INFO :Main :stream_remote_resources: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/base.py", line 1598, in do_rollback
INFO :Main :stream_remote_resources: dbapi_connection.rollback()
INFO :Main :stream_remote_resources: _mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
INFO :Main :set_types: /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/model.py:48: UserWarning: Class models.SchemaModel is deprecated [v0.7-v1)
INFO :Main :set_types: warnings.warn(message, UserWarning)
INFO :Main :set_types: ERROR :Failed to cast row ['1', '1', '4', '33.0', '2013-03-15T00:29:11']
INFO :Main :set_types: Traceback (most recent call last):
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 43, in cast_default
INFO :Main :set_types: return datetime.datetime.strptime(value, self.ISO8601)
INFO :Main :set_types: File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 565, in _strptime_datetime
INFO :Main :set_types: tt, fraction = _strptime(data_string, format)
INFO :Main :set_types: File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 362, in _strptime
INFO :Main :set_types: (data_string, format))
INFO :Main :set_types: ValueError: time data '2013-03-15T00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'
INFO :Main :set_types: During handling of the above exception, another exception occurred:
INFO :Main :set_types: Traceback (most recent call last):
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py", line 71, in <module>
INFO :Main :set_types: spew(datapackage, process_resources(resource_iterator))
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 59, in spew
INFO :Main :set_types: for rec in res:
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py", line 52, in process_resource
INFO :Main :set_types: flattened_row = jts.cast_row(flattened_row)
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/schema.py", line 81, in cast_row
INFO :Main :set_types: result.append(field.cast_value(value))
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/field.py", line 76, in cast_value
INFO :Main :set_types: return self.__type.cast(value, skip_constraints=skip_constraints)
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/base.py", line 120, in cast
INFO :Main :set_types: cast_value = cast_func(value, self.__format_fmt)
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 45, in cast_default
INFO :Main :set_types: raise_with_traceback(exceptions.InvalidDateTimeType(e))
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback
INFO :Main :set_types: raise exc.with_traceback(traceback)
INFO :Main :set_types: File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 43, in cast_default
INFO :Main :set_types: return datetime.datetime.strptime(value, self.ISO8601)
INFO :Main :set_types: File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 565, in _strptime_datetime
INFO :Main :set_types: tt, fraction = _strptime(data_string, format)
INFO :Main :set_types: File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 362, in _strptime
INFO :Main :set_types: (data_string, format))
INFO :Main :set_types: jsontableschema.exceptions.InvalidDateTimeType: time data '2013-03-15T00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'
INFO :Main :stream_remote_resources: ERROR :Output pipe disappeared!
ERROR :Main :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py: 1
ERROR :Main :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py: -9
ERROR :Main :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py: -9
ERROR :Main :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/dump/to_path.py: -9
INFO :Main :RESULTS:
INFO :Main :FAILURE: ./enliten/enliten
Adding the format:
option:
format: 'fmt:%Y-%m-%dT%H:%M:%S'
makes it go. But I end up with an invalid Data Package when trying to read with datapackage-py
.
ValueError: time data '2013-03-15 00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'
If I add a resource to the pipeline with a skip_rows
parameter that's a list (rather than an int), I get a TypeError
from the downstream processor stream_remote_resources
from this method:
ERROR log from processor stream_remote_resources:
+--------
| Traceback (most recent call last):
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 223, in <module>
| spew(datapackage, new_resource_iterator)
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 64, in spew
| for rec in res:
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 41, in _reader
| for i, row in enumerate(_reader):
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/tabulator/stream.py", line 230, in iter
| for row_number, headers, row in iterator:
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 88, in _func
| for number, headers, row in extended_rows:
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 101, in _func
| for number, headers, row in extended_rows:
| File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 81, in _func
| if number > rows_to_skip:
| TypeError: '>' not supported between instances of 'int' and 'list'
As well as lists, tabulator's skip_rows
can also accept a character like #
which would also fail the same comparison check.
pre
step - before data is streamedpre
stepEven though we're already on version 1.0.23
, there are no tags for it (see https://github.com/frictionlessdata/datapackage-pipelines/releases). We should tag the release commits as part of our release process, so it's easier for others to checkout the code on a specific version.
As a dpp user, I want to split or shard data from a single or more resources to a single or more other resources based on certain conditions
Use cases:
see documentation and tests for this suggested processor here
Extracted from #18
Expected:
3 fields are exported
Actual:
Row length (9) doesn't match fields count (3)
As a dpp user, I want to unpivot or transpose my tabular data so that there's only one record per row
As a dpp user, I have data that has dates as column names and I want to "unpivot" and have nice time series data so that I can easily reuse it for plotting graphs
Example:
# Pivoted data
country 2005 2006 2007
Afghanistan 46.6 46.9 47.2
Albania 76.1 76.3 76.5
...
# Unpivoted data
country year average_age
Afghanistan 2005 46.6
Afghanistan 2006 46.9
Afghanistan 2007 47.2
Albania 2005 76.1
Albania 2006 76.3
Albania 2007 76.5
...
dpp run un-pivot-processor
provides valid datapackageExample Spec for given data:
# data
country,2015 income,2015 expenses,2016 income,2016 expenses
xxx,20000,15000,30000,30000
...
# spec
- run: unpivot
parameters:
resources: (same as always)
extraKeyFields:
-
name: year
type: integer
-
name: direction
name: string
extraValueField:
name: amount
type: number
unpivot:
-
name: 2015 incomes
keys:
year: 2015
direction: In
-
name: 2015 expenses
keys:
year: 2015
direction: Out
-
name: 2016 incomes
keys:
year: 2016
direction: In
-
name: 2016 expenses
keys:
year: 2015
direction: Out
# or regex
-
name: ([0-9]{4}) (\w+)
keys:
year: \1
direction: \2
When using sqlite, store would be JSONB fields as strings (probably would need to change jsontableschema-sql-py
but logging this here for now)
Like the engine
attribute of the dump.to_sql
processor, the url
attribute of the add_resource
processor might contain sensitive information like an SQL connection string when loading a table. When publishing a pipeline, it would be nice not to have to include this information directly in the YAML.
In order to submit an issue, please ensure you can check the following. Thanks!
python --version
)Running
GOBBLE_AUTH_TOKEN=mytoken dpp run ./2017-18/national/estimates-of-national-expenditure-south-africa-2017-18
outputs
INFO :Main :Skipping redis connection, host:None, port:6379
INFO :Main :RESULTS:
and exists with exit code zero giving no feedback that something went wrong or what went wrong.
In order to submit an issue, please ensure you can check the following. Thanks!
* [ ] Declare which version of Python you are using (: Dockerpython --version
)
From the ./samples/pipeline-spec.yaml
legend:datapackage-pipelines dan (master)$ docker run -v `pwd`:/pipelines:rw frictionlessdata/datapackage-pipelines run ./samples/worldbank-co2-emissions
INFO :Main :Skipping redis connection, host:None, port:6379
DEBUG :Main :Using selector: EpollSelector
INFO :Main :RUNNING ./samples/worldbank-co2-emissions
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_metadata.py
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_resource.py
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/stream_remote_resources.py
INFO :Main :- /pipelines/samples/add_constant.py
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_zip.py
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_path.py
INFO :Main :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py
INFO :Main :dump.to_sql: Traceback (most recent call last):
INFO :Main :dump.to_sql: File "/usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py", line 2, in <module>
INFO :Main :dump.to_sql: from jsontableschema_sql import Storage
INFO :Main :dump.to_sql: ImportError: No module named 'jsontableschema_sql'
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py: 1
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_metadata.py: -9
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_path.py: -9
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_resource.py: -9
ERROR :Main :FAILED /pipelines/samples/add_constant.py: -9
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_zip.py: -9
ERROR :Main :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/stream_remote_resources.py: -9
INFO :Main :RESULTS:
INFO :Main :FAILURE: ./samples/worldbank-co2-emissions
If you try and dump.to_zip
the remote dataset with a non-tabular resource with dpp, it will zip successfully, but if you try and open zipped resource it has encoding problems. Reproduce:
remote:
pipeline:
-
run: load_resource
parameters:
resource: 'vix-daily_json'
url: "https://pkgstore-testing.datahub.io/core/finance-vix:vix-daily_json/datapackage.json"
stream: False
-
run: dump.to_zip
parameters:
out-file: remote.zip
force-format: False
handle-non-tabular: True
data/vix-daily_json.json
Note: emphasizing remote here as if you try and download datapackage.json and data/vix-daily_json.json locally and zip - it works fine
See detailed debug here datopian/assembler#53 (comment)
As a dpp user, I want to add a new column to my existing data with the result of various operations (like sum, avg, min, max, multiply, constant, join, format) performed on values of pre-defined columns of the same row, so that after running dpp run xxx
output data has new column with value calculated for me.
add_fields
Spec:
run: add_fields
parameters:
resources: resource_name
fields:
- operation: sum
target: new_column_name
nullas: 0 (default)
type: any (default)
source:
- column1
- column2
- operation: sum
target: another_column_name
source:
- column3
- column1
- operation: sum
target: total_sum
source:
- new_column_name
- another_column_name
- operation: avg/min/max/multiply: same as for sum
- operation: join
source:
- new_column_name
- another_column_name
with: ",'"
- operation: format
target: new_column_name
with: '{column1} is better than {column2}'
- operation: constant
target: new_column_name
with: 'my constant'
Available operations:
Data Packages, and Frictionless Data specifications, are essentially part of the protocol and inner workings of this package, but the package itself does not really require knowledge of these specs. Branding it as "Data Packages" is misleading, and potentially would confuse users into thinking that a knowledge of data packages is required to use this package.
I think we should rename the package to simply pipelines
or pipeline
.
What are your thoughts @akariv
Any opinion on this @brew @roll @danfowler @vitorbaptista @amercader @rufuspollock ?
So, I've tried to run Albanian example pipeline specified in README.
version: '2'
services:
pipeline:
build: .
command: sh -c "cd /src && pwd && dpp"
ports:
- 5000:5000
volumes:
- ./:/src
pipeline_1 | Available Pipelines:
pipeline_1 | - ./albanian-treasury (E)
pipeline_1 | Unresolved processor: Couldn't resolve model at /src
Which processor I need to install additionally?
In order to submit an issue, please ensure you can check the following. Thanks!
python --version
)a pipeline with dump.to_sql mode=update
dump.to_sql: INFO :Writing to DB cooperatives -> cooperatives (mode=update, keys=['id'])
throws exception about duplicate key when doing insert:
dump.to_sql: Traceback (most recent call last):
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
dump.to_sql: context)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
dump.to_sql: cursor.executemany(statement, parameters)
dump.to_sql: psycopg2.IntegrityError: duplicate key value violates unique constraint "cooperatives_pkey"
dump.to_sql: DETAIL: Key (id)=(570000018) already exists.
dump.to_sql: The above exception was the direct cause of the following exception:
dump.to_sql: Traceback (most recent call last):
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/dump/to_sql.py", line 134, in
dump.to_sql: SQLDumper()()
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/lib/dump/dumper_base.py", line 40, in __call__
dump.to_sql: self.stats)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 60, in spew
dump.to_sql: for rec in res:
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/lib/dump/dumper_base.py", line 101, in row_counter
dump.to_sql: for row in resource:
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/jsontableschema_sql/writer.py", line 57, in write
dump.to_sql: for wr in self.__insert():
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/jsontableschema_sql/writer.py", line 75, in __insert
dump.to_sql: statement.execute(self.__buffer)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/base.py", line 386, in execute
dump.to_sql: return e._execute_clauseelement(self, multiparams, params)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
dump.to_sql: compiled_sql, distilled_params
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
dump.to_sql: context)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1402, in _handle_dbapi_exception
dump.to_sql: exc_info
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
dump.to_sql: reraise(type(exception), exception, tb=exc_tb, cause=cause)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
dump.to_sql: raise value.with_traceback(tb)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
dump.to_sql: context)
dump.to_sql: File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
dump.to_sql: cursor.executemany(statement, parameters)
dump.to_sql: sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "cooperatives_pkey"
dump.to_sql: DETAIL: Key (id)=(570000018) already exists.
dump.to_sql: [SQL: 'INSERT INTO cooperatives (id, name, registration_date, phone, primary_type_id, primary_type, secondary_type_id, secondary_type, legal_status_id, legal_status, last_status_date, type, municipality_id, municipality, inspector, address) VALUES (%(id)s, %(name)s, %(registration_date)s, %(phone)s, %(primary_type_id)s, %(primary_type)s, %(secondary_type_id)s, %(secondary_type)s, %(legal_status_id)s, %(legal_status)s, %(last_status_date)s, %(type)s, %(municipality_id)s, %(municipality)s, %(inspector)s, %(address)s)'] [parameters: ({'id': '570000018', 'name': 'נחלת ישראל רמה אגודה שתופית בע"מ (במחיקה)', 'registration_date': datetime.datetime(1921, 2, 6, 0, 0), 'phone': '', 'primary_type_id': '43', 'primary_type': 'שיכון', 'secondary_type_id': '61', 'secondary_type': 'שיכון', 'legal_status_id': '23', 'legal_status': 'הודעה שניה על מחיקה', 'last_status_date': datetime.datetime(1970, 1, 29, 12, 0), 'type': 'התאחדות האיכרים', 'municipality_id': None, 'municipality': '', 'inspector': 'צוק חיים', 'address': 'דופקר ++++'}, {'id': '570000026', 'name': 'הלואה וחסכון ירושלים אגודה שיתופית בע"מ', 'registration_date': datetime.datetime(1921, 3, 24, 10, 25), 'phone': '02-6234432', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1921, 3, 24, 8, 13, 46), 'type': '', 'municipality_id': '3000', 'municipality': 'ירושלים', 'inspector': 'בן-חמו יוסף', 'address': 'קרן היסוד 41 ירושלים 94188 ת.ד: 2575'}, {'id': '570000034', 'name': 'הלואה וחיסכון זכרון יעקב אגודה הדדית בע"מ', 'registration_date': datetime.datetime(1921, 10, 16, 0, 0), 'phone': '', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '26', 'legal_status': 'אגודה מוזגה - מבוטלת', 'last_status_date': datetime.datetime(1977, 1, 29, 12, 0), 'type': '', 'municipality_id': '9300', 'municipality': 'זכרון יעקב', 'inspector': 'חליל יעקב', 'address': 'הנציב זכרון יעקב מיקוד ת.ד: 8'}, {'id': '570000042', 'name': 'קיבוץ איילת השחר', 'registration_date': datetime.datetime(1921, 12, 19, 12, 52), 'phone': '04-6932111', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '74', 'secondary_type': 'קיבוץ מתחדש', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1921, 12, 19, 13, 27, 17), 'type': 'תנועה קבוצית מאוחדת תק"מ', 'municipality_id': '77', 'municipality': 'איילת השחר', 'inspector': 'מור טל', 'address': 'ד.נ. גליל עליון איילת השחר 12200'}, {'id': '570000059', 'name': 'אגודה שיתופית לעזרה הדדית ברחובות בע"מ', 'registration_date': datetime.datetime(1921, 12, 19, 0, 0), 'phone': '', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '25', 'legal_status': 'אגודה בוטלה לאחר פירוק', 'last_status_date': datetime.datetime(1987, 5, 21, 12, 0), 'type': '', 'municipality_id': '8400', 'municipality': 'רחובות', 'inspector': 'יהודה משה', 'address': 'רחובות רחובות'}, {'id': '570000067', 'name': 'הכפר העברי - אגודה הדדית בע"מ', 'registration_date': datetime.datetime(1922, 2, 2, 0, 0), 'phone': '02-5700756', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '54', 'secondary_type': 'אגודה חקלאית כללית', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1922, 2, 2, 12, 0), 'type': '', 'municipality_id': '3000', 'municipality': 'ירושלים', 'inspector': 'בן-חמו יוסף', 'address': 'נמצא אצל אבנאור שמואל, אביזהר, אחוזת בית הכ 8 כניסה: 247 ירושלים 96267'}, {'id': '570000075', 'name': 'החקלאית אג"ש לבטוח ולשרותים וטרינריים למקנה בישראל בעמ', 'registration_date': datetime.datetime(1922, 4, 11, 0, 0), 'phone': '04-6279600', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '57', 'secondary_type': 'ביטוח חקלאי', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1922, 4, 11, 12, 0), 'type': '', 'municipality_id': '1167', 'municipality': 'קיסריה', 'inspector': 'שרעבי מזל', 'address': 'הברקת 20 קיסריה 38900 ת.ד: 3039'}, {'id': '570000083', 'name': 'קופת מלוה חקלאית לגמ"ח אגודה שיתופית פתח תקוה בע"מ', 'registration_date': datetime.datetime(1922, 6, 29, 0, 0), 'phone': '', 'primary_type_id': '45', 'primary_type': 'תגמולים פנסיה ועזה"ד', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '25', 'legal_status': 'אגודה בוטלה לאחר פירוק', 'last_status_date': datetime.datetime(1992, 12, 14, 12, 0), 'type': '', 'municipality_id': '7900', 'municipality': 'פתח תקוה', 'inspector': '', 'address': 'מונטיפיורי 14 פתח תקוה 49364'} ... displaying 10 of 1001 total bound parameter sets ... {'id': '570010009', 'name': 'הכורם הצעיר אגודה שיתופית חקלאית להספקת מים ברחובות בע"מ', 'registration_date': datetime.datetime(1950, 10, 23, 0, 0), 'phone': '08-9461817', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '56', 'secondary_type': 'אספקת מים', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1950, 10, 23, 12, 0), 'type': '', 'municipality_id': '8400', 'municipality': 'רחובות', 'inspector': 'בר-נתן יונה', 'address': 'הרצל 143 רחובות 76266'}, {'id': '570010017', 'name': 'מעונות עובדי קופת חולים ב\' בגבעתיים אגודה שיתופית בע"מ', 'registration_date': datetime.datetime(1950, 10, 23, 0, 0), 'phone': '03-6250814', 'primary_type_id': '43', 'primary_type': 'שיכון', 'secondary_type_id': '61', 'secondary_type': 'שיכון', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1950, 10, 23, 12, 0), 'type': '', 'municipality_id': '6300', 'municipality': 'גבעתיים', 'inspector': 'בר-נתן יונה', 'address': 'נמצא אצל בוריס בריק, מצולות ים 14 גבעתיים 53486'})]
(sink): /usr/local/lib/python3.6/site-packages/jsontableschema/model.py:48: UserWarning: Class models.SchemaModel is deprecated [v0.7-v1)
(sink): warnings.warn(message, UserWarning)
Basic processor testing is provided for in datapackage-pipelines (lib_test_helpers) using fixtures. But because the processor is run in a python subprocess, it's difficult to apply mocking for dynamic data such as dates, or the results of calls to external api services.
It would be useful if datapackage-pipelines provided the boilerplate for setting up a test that mocks ingest
and spew
, so processor code can be tested and relevant parts mocked out. So when our processor under test calls ingest
it receives what we pass it in the test, and when it calls spew
it populates a return values from test_processor
that we can run asserts against.
@mock.patch('datetime.datetime')
test_my_great_processor(mock_date):
mock_date.side_effect = datetime.datetime.now()
ingest_tuple = (some_params, a_datapackage, [])
spew_results = test_processor(great_processor, ingest_tuple)
# assert against spew_results here
Hi there
When attempting to stream zipped datapackage files as resources, I get an error stating the temp directory does not exist:
ERROR log from processor stream_remote_resources:
+--------
| Traceback (most recent call last):
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 212, in
| rows = stream_reader(resource, url, ignore_missing or url == "")
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 169, in stream_reader
| schema, headers, stream, close = get_opener(url, _resource)()
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 154, in opener
| _stream.open()
| File "/anaconda/lib/python3.6/site-packages/tabulator/stream.py", line 169, in open
| source = tempfile.NamedTemporaryFile(suffix='.' + name)
| File "/anaconda/lib/python3.6/tempfile.py", line 549, in NamedTemporaryFile
| (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
| File "/anaconda/lib/python3.6/tempfile.py", line 260, in _mkstemp_inner
| fd = _os.open(file, flags, 0o600)
| FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/hv/8kj9pgt513q6kx2h29yyrrfr0000gn/T/tmprg7n15qi.data/55e4d040-1491-4ef1-9d3d-66b05fae4280.csv'
+--------
Thanks a lot
In order to submit an issue, please ensure you can check the following. Thanks!
python --version
)It would be greast if dpp showed an exception on import if a module contained print statements as this interferes with the standard out.
Would also be good to provide a logging example for a custom pipeline step and link the user to it if they try to print.
resource['dialect'] = dict(
lineTerminator='\r\n',
delimiter=',',
doublequote=True,
quoteChar='"',
skipinitialspace=False
)
Dialect:
{
"delimiter": ";",
"doubleQuote": false,
"lineTerminator": "\n",
"quoteChar": "'",
"skipInitialSpace": false,
"header": false
}
This breaks our R library for Data Packages: https://github.com/ropenscilabs/datapkg
In order to submit an issue, please ensure you can check the following. Thanks!
Python 3.6/macOS Sierra
ingest
and spew
are awesome little functions, but I found there was still a lot of boilerplate to do for each processor. I suggest writing some additional wrapper code to deal with with that. I base my suggestions on use-cases I've encountered so far.
I assume that most of the time, you either want to process data row by row and/or mutate the datapackage. Let's talk about row processing first.
For row by row processing, the wrapper code would fulfill 2 purposes:
Provide boilerplate functionality
Pass context to the row processor
My first attempt at writing code for that resulted in the utility.process
function. There's no stats functionality at this stage. The API looks like:
parameters, _, resources = ingest()
new_resources = process(resources, processor_function, parameter_key=value)
spew(resources, datapackage)
My second attempt (see code in progress) is a Processor
class with an API along the lines of:
parameters, datapackage, resources = ingest()
processor = Processor(function=processor_function,
exceptions=(ValueError),
enumerate_rows=True,
sample_size=100,
datapackage=datapackage,
parameter_key1=default_value1,
parameter_key2=default_value2)
new_resources = processor.process(resources)
spew(resources, datapackage)
What I would really like to achieve is:
@row_processor
def my_awesome_processor(row):
# do stuff
return row, stats
And similarly add datapackage mutation like:
@datapackage_mutator
def my_awesome_mutator(datapackage):
# do stuff
return datapackage
@akariv care to comment?
As a dpp user, I want to be able to run pipelines on compressed (zip, gz) tabular data (csv, xsl(x)), so that I do not have to decompress them first and run pipelines after.
.zip
but are valud compressed tabular data (See analysis)compression
to the list of params: https://github.com/frictionlessdata/datapackage-pipelines/blob/master/datapackage_pipelines/lib/stream_remote_resources.py#L136The above is completely possible if path or URL to my compressed file ends with correct extension Eg .zip
as by default compression will be inferred from file name by the tabulator. but if url for my compressed file does not end with .zip
pipelines will fail.
For example, I have two identical zip files:
If you create two identical pipeline-spec.yaml
with a difference of only URLs from above and run them (Take a look this gist):
Above is happening cause compression
is not on the list of parameters to pass to tabulator: https://github.com/frictionlessdata/datapackage-pipelines/blob/master/datapackage_pipelines/lib/stream_remote_resources.py#L136
In order to submit an issue, please ensure you can check the following. Thanks!
python --version
)Currently I have to create a sseparate task in order to use a custom parser like this:
This breaks pep8
As part of frictionlessdata/pilot-dm4t#21, I'm using datapackage-pipelines to read some large files I have stored locally. However, only remote resources are supported by the add_resource
processor:
"You can only add remote resources using this processor"
To work around this, I am serving them via a local HTTP server. Is there any intention of supporting local resources in the future?
As a dpp user, I want to be able to find some strings and replace/remove them, so I don't need to do it manually.
As a dpp user, I want to be able to replace certain patterns from field values so that I'm able to clean my data as I want.
For example one of the field in my data may have anchors for
footnotes:
year,country,number,
2000,XXX,12345
2001 (2),XXX,12345
find_replace
Spec:
-
run: find_replace
parameters:
resources: as always
fields:
-
name: my_field
patterns:
-
find: ([0-9]{4}) (\(\w+\))
replace: \1
-
name: my_second_field
patterns:
-
find: Q1
replace: '03-31'
-
find: Q2
replace: '06-31'
-
find: Q3
replace: '09-30'
-
find: Q4
replace: '12-31'
README.md example references spec in current directory when it should reference spec in ./samples/ directory.
In order to submit an issue, please ensure you can check the following. Thanks!
* [ ] Declare which version of Python you are using (: Dockerpython --version
)
There is a example about unpivot feature:
unpivot:
-
name: ([0-9]{4}) (\\w+) # regex for original column
keys:
year: \\1 # First member of group from above
direction: \\2 # Second member of group from above
I'm not sure why is here double backslash \\1
in the readme, but when I use dpp
in a pipeline processor with this example I got ERROR :Failed to cast row: Field "year" can't cast value "\1" for type "year" with format "default"
. With \1
everything is ok.
Please, somebody ping me to ensure that there is no special reasons to use double backslash \\
and I'll fix the doc.
Hi, I am new here, and like the ideia of use the pipeline-spec.yaml...
...But not understand, how to explain, to other people de difference between datapackage-pipelines and SQL (the most popular standard for express tabular-processing).
So, a suggestion is to include a document.md, or a section at README.md, showing for ceptics and dummies the basic differences... or perhaps a rationale, explaing why to not use SQL database export/import to the "declarative processing of tabular data".
@danfowler has recently been using DPP for some very large source files (8GB CSV). With the default way pipelines here work, processing this data via a single conceptual stream is too slow.
There are various ways to deal with this:
I'd like to explore option 3. @akariv what are your thoughts?
From my very high-level view of the framework, there are no inherent design barriers to this. The only thing I guess is that the descriptor object is mutable global state, and maybe the caching mechanisms too. This might mean that spawned processors should only work on the data sources, and not on metadata.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.