Code Monkey home page Code Monkey logo

datapackage-pipelines's People

Contributors

acckiygerman avatar akariv avatar brew avatar cschloer avatar cyberbikepunk avatar danfowler avatar gperonato avatar jbothma avatar kant avatar loleg avatar orihoch avatar roll avatar rufuspollock avatar vitorbaptista avatar zelima avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

datapackage-pipelines's Issues

Allow running code after spew() iterates, but before it signals we're done

Consider a pipeline that ends with 2 processors:

  • ...
  • dump.to_zip
  • upload_zip

dump.to_zip writes a datapackage as a zip in the local filesystem, which upload_zip loads and uploads to some service. This means that they have a dependency: upload_zip can't run before dump.to_zip is finished.

The design of datapackage-pipelines is to work with streaming data. Every processor runs in parallel, taking the data rows as they come. The current pattern that we use to make sure the previous processors have finished running is to call spew() to pass the data on to the next processors (modified or not), and then run our code. This works fine if there's only one processor that uses this pattern, but not if there's more than one. Consider this simplified code for dump.to_zip and upload_zip.

  • dump.to_zip
zip_file = zipfile.ZipFile(out_file)

spew(
    datapackage,
    resource_iter  # This iterator will write the files to `zip_file`
)

# After spew() has finished, it'll unblock the following processor's spew() calls
# so the code after their spew() calls will start running.

zip_file.close()
  • upload_zip
# Block until the previous processor's spew() calls have finished
spew(datapackage, resource_iter)

# Load zip from path and upload it
upload_zip(params['in_file'])

This doesn't guarantee that dump.to_zip will call zip_file.close() before upload_zip read the zip file, so it might be reading an invalid zip from disk (because dump.to_zip is still writing the contents). We have a race condition.

This race condition happens because the processors are communicating outside datapackage-pipelines, using the filesystem. As dpp doesn't know about this, it has no way to coordinate their communication. To avoid this, we must get the code in dpp.

My proposal is to add a finalizer parameter to spew() where you can send finalization code to be executed after spew() has done processing the incoming data, but before it has signaled to the next processors in the pipeline that we're finished. This way, the code for (for example) dump.to_zip would become:

zip_file = zipfile.ZipFile(out_file)

spew(
    datapackage,
    resource_iter,
    finalizer=lambda: zip_file.close()
)

I have implemented this pattern on openspending/datapackage-pipelines-fiscal@3910481, although as that's outside dpp, I had to do it in a hackish way.

print() from processors causes error

Python 3.6.1 MacOS

I'm writing a pipeline with a very simple processor. If there's a print() statement in there, running the pipeline causes this crash:

INFO    :Main                            :Skipping redis connection, host:None, port:6379
WARNING :Main                            :No csv processor available for my-first-item
INFO    :Main                            :RUNNING ./my-first-item
INFO    :Main                            :- add_metadata
INFO    :Main                            :- spss.add_spss
INFO    :Main                            :- (sink)
INFO    :Main                            :DONE /Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/specs/../lib/add_metadata.py
INFO    :Main                            :(sink): Traceback (most recent call last):
INFO    :Main                            :(sink):   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py", line 5, in <module>
INFO    :Main                            :(sink):     params, dp, res_iter = ingest()
INFO    :Main                            :(sink):   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/wrapper.py", line 34, in ingest
INFO    :Main                            :(sink):     datapackage, resource_iterator = process_input(sys.stdin, validate, debug)
INFO    :Main                            :(sink):   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/input_processor.py", line 63, in process_input
INFO    :Main                            :(sink):     dp = json.loads(dp_json)
INFO    :Main                            :(sink):   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/utilities/extended_json.py", line 123, in _loads
INFO    :Main                            :(sink):     return _json.loads(*args, **kwargs)
INFO    :Main                            :(sink):   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 367, in loads
INFO    :Main                            :(sink):     return cls(**kw).decode(s)
INFO    :Main                            :(sink):   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
INFO    :Main                            :(sink):     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
INFO    :Main                            :(sink):   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
INFO    :Main                            :(sink):     raise JSONDecodeError("Expecting value", s, err.value) from None
INFO    :Main                            :(sink): json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
INFO    :Main                            :DONE /Users/brew/virtualenvs/ukds_pilot/src/datapackage-pipelines-spss/datapackage_pipelines_spss/processors/add_spss.py
ERROR   :Main                            :FAILED /Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py: 1
WARNING :Main                            :No csv processor available for my-first-item
WARNING :Main                            :No csv processor available for my-first-item
INFO    :Main                            :RESULTS:
INFO    :Main                            :FAILURE: ./my-first-item 
ERROR log from processor (sink):
+--------
| Traceback (most recent call last):
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/manager/../lib/internal/sink.py", line 5, in <module>
|     params, dp, res_iter = ingest()
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/wrapper.py", line 34, in ingest
|     datapackage, resource_iterator = process_input(sys.stdin, validate, debug)
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/wrapper/input_processor.py", line 63, in process_input
|     dp = json.loads(dp_json)
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines-1.1.8-py3.6.egg/datapackage_pipelines/utilities/extended_json.py", line 123, in _loads
|     return _json.loads(*args, **kwargs)
|   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/__init__.py", line 367, in loads
|     return cls(**kw).decode(s)
|   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 339, in decode
|     obj, end = self.raw_decode(s, idx=_w(s, 0).end())
|   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/json/decoder.py", line 357, in raw_decode
|     raise JSONDecodeError("Expecting value", s, err.value) from None
| json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Using the logging module to stout works fine. However, I'm importing third-party modules into the processor that use print so can't easily avoid it.

travis should run tests against all supported database engines

  • should use the travis Build Matrix feature to run the tests on all supported database engines
  • I guess it means sqlite and PostgreSQL, but maybe we could add mysql as well?
  • on each build it should:
    • setup the DB engine (perhaps using docker)
    • set the OVERRIDE_TEST_DB environment variable accordingly
    • run the tests, passing this environment variable to tox environment

dump.to_sql should detect schema changes

reproduction steps

  • use dump.to_sql with a field defined as date
  • when dump.to_sql is run it will create the table and defined the column as a date type in the sql engine
  • modify the code, now the field is defined as string
  • re-run the pipeline

expected

  • dump.to_sql should fail and warn that the schema changed and now the descriptor schema doesn't match the sql schema

actual

  • dump.to_sql tries to update the field (assuming it's in the correct type) and fails, showing a hard to debug error message

notes

  • the jsontableschema_sql library has a method that returns the descriptor from an sql table - if it works it should provide the needed functionality

Running pipeline with unknown processor makes dpp fails silently

Python 3.6 @ Ubuntu 16.04

Consider the pipeline:

test:
  pipeline:
    -
      run: inexistent_task

When running it via dpp run:

$ dpp run ./test
INFO    :Main                            :Skipping redis connection, host:None, port:6379
INFO    :Main                            :RESULTS:

The same result happens if we try running an unknown pipeline, like:

$ dpp run ./this_doesnt_exist
INFO    :Main                            :Skipping redis connection, host:None, port:6379
INFO    :Main                            :RESULTS:

I would expect it to fail, ideally with a helpful error message (like we show when simply running dpp).

Should add support for metrics collection from all processors and pipelines

Scenario 1 - long running processor

  • Run a long-running processor
  • Want to see the progress of the processor as it runs

expected

  • should have a way to quickly, reliably and consistently check the progress
  • Some example questions:
    • how many rows were processed so far?
    • which processor is currently running? for how long?

actual

  • try to track the output data - but it might be committed in batches, also it's not a reliable or consistent.
  • check the processor log (add logging.info) - also, not reliable or consistent

Scenario 2 - BI / reporting

  • Have a complex environment with many pipelines
  • Want to see reports showing which pipelines / processors ran and when / show statistics

expected

  • should have a way to generate reports over time
  • example questsions:
    • what's the average processing time per pipeline / per processor / per row
    • when a pipeline last ran in the past? for how long it ran? how many rows were yielded?

actual

  • only possible by looking at the output data

Scenario 3 - alerting

  • Have a complex environment with many pipelines
  • it's hard to track pipelines manually

expected

  • should have a way to alert in real-time based on pipeline processing
  • example alerts:
    • a pipeline takes too long to run (compared to previous average running time)
    • a pipeline is "stuck" and stopped yielding new rows in a timely manner

actual

  • no way to have real-time alerts

Suggested solution

The datapackage-pipelines-metrics provides most of the required features, but it has 2 major problems:

  • it's not pretty to integrate - you have to rename all pipeline specs to the plugin source spec filename
    • or - manually add it to your own source spec - or to each pipeline
  • it aggregates metrics based on datapackage or resource - there is no way to analyze a specific step or a specific processor

AFAIK the current plugin framework doesn't support this use-case - which might be a good thing (I wouldn't like to see general hooks / events which allow to modify how the system works)

but - I think that for metrics, it is needed

perhaps a generic pluggable metrics architecture is in order?

Sample pipeline on Windows 10

Hey Adam!

Is Windows supported for the datapackage-pipelines package?
I got it to work on bash on windows, but windows native I get the following error:

C:\dpp\wb_test>dpp run all
INFO    :Main                            :Skipping redis connection, host:None, port:6379
DEBUG   :Main                            :Using selector: SelectSelector
INFO    :Main                            :RUNNING .\worldbank-co2-emissions
INFO    :Main                            :- add_metadata
Traceback (most recent call last):
  File "C:\Users\Jasper Heeffer\AppData\Local\Programs\Python\Python36-32\Scripts\dpp-script.py", line 11, in <module>
    load_entry_point('datapackage-pipelines==1.0.14', 'console_scripts', 'dpp')()
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 697, in main
    rv = self.invoke(ctx)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\click\core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\cli.py", line 68, in run
    execute_if_needed(pipeline_id, spec, use_cache)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\cli.py", line 40, in execute_if_needed
    use_cache=use_cache)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 254, in execute_pipeline
    return loop.run_until_complete(pipeline_task)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 466, in run_until_complete
    return future.result()
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 187, in async_execute_pipeline
    await construct_process_pipeline(pipeline_steps, pipeline_cwd, errors)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 135, in construct_process_pipeline
    process = await create_process(args, pipeline_cwd, wfd, rfd)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\subprocess.py", line 225, in create_subprocess_exec
    stderr=stderr, **kwds)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 1190, in subprocess_exec
    bufsize, **kwargs)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\coroutines.py", line 210, in coro
    res = func(*args, **kw)
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 340, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError
ERROR   :Main                            :Task was destroyed but it is pending!
task: <Task pending coro=<dequeue_errors() running at c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py:29> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x04766A50>()]>>
Exception ignored in: <coroutine object dequeue_errors at 0x04767B40>
Traceback (most recent call last):
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\site-packages\datapackage_pipelines\manager\tasks.py", line 29, in dequeue_errors
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\queues.py", line 169, in get
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 573, in call_soon
  File "c:\users\jasper heeffer\appdata\local\programs\python\python36-32\lib\asyncio\base_events.py", line 357, in _check_closed
RuntimeError: Event loop is closed
  • Python 3.6
  • Windows 10

Quickstart experience. User-friendly error handling

Here are some thoughts from newbie:

  1. I expected that dpp init would guide me through pipeline creation
  2. dpp --help didn't show how do I specify pipeline
  3. Running dpp serve and going to http://localhost:5000/ resulted in builtins.KeyError KeyError: b'all-pipelines' which is not a user-friendly way to show that I don't have any pipelines.

Suggestions:

  1. step-by-step pipeline creation (dpp init) wizard would be awesome
  2. Elaborate help by showing some examples e.g. dpp run ./pipeline.yaml, dpp serve ./pipeline.yaml
  3. dpp serve should validate whether there are any pipelines available and show nice user-friendly error with suggestions to fix

  • python 3.5
  • alpine 3.4

Example showing processing zipped xls files

I'm trying to construct an example for processing zipped xls.
Downloader seems to only support csv.

A couple of questions:

  1. Can I use /tmp? How can I make sure it's deleted after I'm done? Even if some error occurs. Is there some kind of with statement across the pipeline?
  2. How can I spew parameters to the next processor (i.e. filenames in a zip)? Should I use yield or modify datapackage?
  3. Could find a link to excel processor. Is there a list of all processors available somewhere?

Processor for deleting fields

As a dpp user, I want to be able to pre-define unnecessary columns for my data, so that after running dpp run xxx output data does not have those columns

Acceptance Criteria

  • able to delete column with dpp

Tasks

  • write spec
  • write tests
  • add remove_fields processor to common processors
  • update README

Analysis

Spec:

run: delete_fields
parameters:
  resources: resource_name
  columns:
    - column_name1
    - column_name2

Expose more information about the resource when logging info or errors

When processing large datasets with multiple resources, using datapackage-pipelines would be easier if it provided more information about the particular resource in certain types of errors. For instance, when failing to set a type in the set_types processor, it be helpful if

"No field found matching %r" % field_name

were more like:

"No field found matching %r in resource %s" % (field_name, name)

where name was the resource name.

Likewise, when reading from an SQL database table:

INFO    :Main                            :stream_remote_resources: INFO    :enliten: OPENING mysql://root@localhost/enliten

Would be more helpful if it referenced the table name being pulled.

Examples in README are mixing tabs and spaces

For example, see https://github.com/frictionlessdata/datapackage-pipelines/blob/054db2faa0e768104ac43fd070cf3cc2e17ef69f/README.md#add_resource. The YAML example is:

- run: add_resource
  parameters: 
    url: http://example.com/my-excel-file.xlsx
    sheet: 1
    headers: 2
- run: add_resource
  parameters:
    url: http://example.com/my-csv-file.csv
	encoding: "iso-8859-2"

Notice the red highlight before encoding. That's a TAB. I'm not sure which level the encoding should be in, so I couldn't send a PR, but there are a few examples like this in the README.

Provide more detailed error message during data validation

In any case, when using the set_types standard processor, it will validate and transform the input data with the new types

When setting types, if the data does not conform to the constraints or type set in the schema, it will fail:

INFO    :Main    :dump.to_path: jsontableschema.exceptions.ConstraintError: The field 'Aggregate' must not be more than 10
INFO    :Main    :dump.to_path: jsontableschema.exceptions.InvalidDateType: time data '523' does not match format '%Y-%m-%d'

It would be helpful if the processor provided, say, the row/resource that failed the check. Something equivalent to the Good Tables output. Or, provide the option not to do a data validation on set_types so that I can manually run goodtables afterwards.

Allow plugins to provide custom spec parsers

Currently, plugins can optionally provide one Generator class. And one my_plugin.source-spec.yaml filetype per generator. This means each *.source-spec.yaml filetype requires its own plugin, and generators in separate plugins can't share common processors.

I propose letting plugins provide their own custom spec parsers that extend parsers.base_parser.BaseParser. This would allow plugins to resolve source-specs and generators in their own way, potentially allowing plugins to provide more than one generator type, subsequently allowing more than one *.source-spec.yaml filetype per plugin.

For example, the datapackage_pipelines_measure plugin could have a social-media generator, a website-analytics generator, a code-packaging generator, etc. And each project directory could contain the corresponding social-media.measure.source-spec.yaml, website-analytics.measure.source-spec.yaml, and code-packaging.measure.source-spec.yaml files.

A proposed parser discovery solution:

    • specs.find_specs() looks for more parsers (subclasses of BaseParser) in the parsers directory of the plugin
    • instances of discovered plugin-supplied parsers are prepended to specs.SPEC_PARSERS (so they take precedence over native parsers).
    • specs.find_specs() carries on as normal

What do you think, @akariv?

Error creating valid datetime fields in Data Packages

enliten:
  pipeline:
    - run: add_metadata
      parameters:
        name: 'enliten'
        title: 'ENLITEN'
        homepage: 'http://www.cs.bath.ac.uk/enliten/'
        temporal:
          name: 'Data Collected'
          start: '2013-10-01'
          end: '2015-06-30'
    - parameters:
        name: humidity
        table: humidity
        url: mysql://root@localhost/enliten
      run: add_resource
    - run: stream_remote_resources
    - run: set_types
      parameters:
        resources: humidity
        types:
          '_id':
            type: integer
          device:
            type: integer
          sensor:
            type: integer
          humidity:
            type: number
          timestamp:
            type: datetime
    - run: dump.to_path
      parameters:
        out-path: ../_datasets/enliten

produces

(pilot-dm4t) legend:pilot-dm4t dan (convert-to-datapackage-pipelines)$ dpp run ./enliten/enliten
INFO    :Main                            :Skipping redis connection, host:None, port:6379
INFO    :Main                            :RUNNING ./enliten/enliten
INFO    :Main                            :- add_metadata
INFO    :Main                            :- add_resource
INFO    :Main                            :- stream_remote_resources
INFO    :Main                            :- set_types
INFO    :Main                            :- dump.to_path
INFO    :Main                            :- (sink)
INFO    :Main                            :DONE /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/add_metadata.py
INFO    :Main                            :DONE /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/add_resource.py
INFO    :Main                            :stream_remote_resources: INFO    :enliten: OPENING mysql://root@localhost/enliten
INFO    :Main                            :stream_remote_resources: ERROR   :Exception during reset or similar
INFO    :Main                            :stream_remote_resources: Traceback (most recent call last):
INFO    :Main                            :stream_remote_resources:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/pool.py", line 687, in _finalize_fairy
INFO    :Main                            :stream_remote_resources:     fairy._reset(pool)
INFO    :Main                            :stream_remote_resources:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/pool.py", line 829, in _reset
INFO    :Main                            :stream_remote_resources:     pool._dialect.do_rollback(self)
INFO    :Main                            :stream_remote_resources:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/base.py", line 1598, in do_rollback
INFO    :Main                            :stream_remote_resources:     dbapi_connection.rollback()
INFO    :Main                            :stream_remote_resources: _mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
INFO    :Main                            :set_types: /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/model.py:48: UserWarning: Class models.SchemaModel is deprecated [v0.7-v1)
INFO    :Main                            :set_types:   warnings.warn(message, UserWarning)
INFO    :Main                            :set_types: ERROR   :Failed to cast row ['1', '1', '4', '33.0', '2013-03-15T00:29:11']
INFO    :Main                            :set_types: Traceback (most recent call last):
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 43, in cast_default
INFO    :Main                            :set_types:     return datetime.datetime.strptime(value, self.ISO8601)
INFO    :Main                            :set_types:   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 565, in _strptime_datetime
INFO    :Main                            :set_types:     tt, fraction = _strptime(data_string, format)
INFO    :Main                            :set_types:   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 362, in _strptime
INFO    :Main                            :set_types:     (data_string, format))
INFO    :Main                            :set_types: ValueError: time data '2013-03-15T00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'
INFO    :Main                            :set_types: During handling of the above exception, another exception occurred:
INFO    :Main                            :set_types: Traceback (most recent call last):
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py", line 71, in <module>
INFO    :Main                            :set_types:     spew(datapackage, process_resources(resource_iterator))
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 59, in spew
INFO    :Main                            :set_types:     for rec in res:
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py", line 52, in process_resource
INFO    :Main                            :set_types:     flattened_row = jts.cast_row(flattened_row)
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/schema.py", line 81, in cast_row
INFO    :Main                            :set_types:     result.append(field.cast_value(value))
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/field.py", line 76, in cast_value
INFO    :Main                            :set_types:     return self.__type.cast(value, skip_constraints=skip_constraints)
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/base.py", line 120, in cast
INFO    :Main                            :set_types:     cast_value = cast_func(value, self.__format_fmt)
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 45, in cast_default
INFO    :Main                            :set_types:     raise_with_traceback(exceptions.InvalidDateTimeType(e))
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback
INFO    :Main                            :set_types:     raise exc.with_traceback(traceback)
INFO    :Main                            :set_types:   File "/Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/jsontableschema/types/datetime.py", line 43, in cast_default
INFO    :Main                            :set_types:     return datetime.datetime.strptime(value, self.ISO8601)
INFO    :Main                            :set_types:   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 565, in _strptime_datetime
INFO    :Main                            :set_types:     tt, fraction = _strptime(data_string, format)
INFO    :Main                            :set_types:   File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/_strptime.py", line 362, in _strptime
INFO    :Main                            :set_types:     (data_string, format))
INFO    :Main                            :set_types: jsontableschema.exceptions.InvalidDateTimeType: time data '2013-03-15T00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'
INFO    :Main                            :stream_remote_resources: ERROR   :Output pipe disappeared!
ERROR   :Main                            :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/set_types.py: 1
ERROR   :Main                            :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py: -9
ERROR   :Main                            :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py: -9
ERROR   :Main                            :FAILED /Users/dan/open_knowledge/_envs/pilot-dm4t/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/dump/to_path.py: -9
INFO    :Main                            :RESULTS:
INFO    :Main                            :FAILURE: ./enliten/enliten 

Adding the format: option:

            format: 'fmt:%Y-%m-%dT%H:%M:%S'

makes it go. But I end up with an invalid Data Package when trying to read with datapackage-py.

ValueError: time data '2013-03-15 00:29:11' does not match format '%Y-%m-%dT%H:%M:%SZ'

screen shot 2017-06-06 at 15 05 05

stream_remote_resources TypeError if skip_rows is a list or str

If I add a resource to the pipeline with a skip_rows parameter that's a list (rather than an int), I get a TypeError from the downstream processor stream_remote_resources from this method:

def row_skipper(rows_to_skip):
def _func(extended_rows):
for number, headers, row in extended_rows:
if number > rows_to_skip:
yield (number-rows_to_skip, headers, row)
return _func

ERROR log from processor stream_remote_resources:
+--------
| Traceback (most recent call last):
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 223, in <module>
|     spew(datapackage, new_resource_iterator)
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 64, in spew
|     for rec in res:
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 41, in _reader
|     for i, row in enumerate(_reader):
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/tabulator/stream.py", line 230, in iter
|     for row_number, headers, row in iterator:
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 88, in _func
|     for number, headers, row in extended_rows:
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 101, in _func
|     for number, headers, row in extended_rows:
|   File "/Users/brew/virtualenvs/ukds_pilot/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 81, in _func
|     if number > rows_to_skip:
| TypeError: '>' not supported between instances of 'int' and 'list'

As well as lists, tabulator's skip_rows can also accept a character like # which would also fail the same comparison check.

dump.to_sql should detect sql engine problems before starting to download

reproduction steps

  • start scraping some data using dump.to_sql but with invalid SQL engine setting

expected

  • milliseconds after running should fail with an error regarding failure to connect to SQL server

actual

  • pipeline starts running
  • might download some data, do some stuff
  • only when it reaches the dump.to_sql it will fail

notes

  • the more general fix would be to allow processors a pre step - before data is streamed
  • not sure if such an option exists today or not
  • then, dump.to_sql will try to connect to SQL on this pre step

add split processor to standard library

As a dpp user, I want to split or shard data from a single or more resources to a single or more other resources based on certain conditions

Use cases:

  • get a sample of rows for preview from a large datapackage
  • allow parallel processing of resources by sharding the source data
  • provide more user friendly datapackages for large datasets, split to smaller chunks which users can open in a spreadsheet

see documentation and tests for this suggested processor here

Downloader processor issues

Extracted from #18

  1. Specify resource url to csv and 3 fields (actual csv contains 9 fields)
  2. Pass these parameters to downloader

Expected:

3 fields are exported

Actual:

Row length (9) doesn't match fields count (3)

Unpivot tabular data

As a dpp user, I want to unpivot or transpose my tabular data so that there's only one record per row

As a dpp user, I have data that has dates as column names and I want to "unpivot" and have nice time series data so that I can easily reuse it for plotting graphs

Example:

# Pivoted data
country  	2005	2006	2007	
Afghanistan	46.6	46.9	47.2	
Albania	        76.1	76.3	76.5	
...

# Unpivoted data
country      year  average_age
Afghanistan  2005  46.6
Afghanistan  2006  46.9
Afghanistan  2007  47.2
Albania      2005  76.1
Albania      2006  76.3
Albania      2007  76.5
...

Acceptance criteria

  • Able to normalize my pivoted data
  • dpp run un-pivot-processor provides valid datapackage
  • Able to pre define the name of columns to insert values

Tasks

  • Do analysis
  • Prepare Spec
  • Failing Tests for processor
  • Create unpivot processor
  • Passing test
  • update REAMDE

Analysis

Example Spec for given data:

# data
country,2015 income,2015 expenses,2016 income,2016 expenses
xxx,20000,15000,30000,30000
...

# spec
- run: unpivot
  parameters:
    resources: (same as always)
    extraKeyFields:
      -
        name: year
        type: integer
      -
        name: direction
        name: string
    extraValueField:
        name: amount
        type: number
    unpivot:
      - 
        name: 2015 incomes
        keys:
          year: 2015
          direction: In
      - 
        name: 2015 expenses
        keys:
          year: 2015
          direction: Out
      - 
        name: 2016 incomes
        keys:
          year: 2016
          direction: In
      - 
        name: 2016 expenses
        keys:
          year: 2015
          direction: Out
      # or regex 
      -
        name: ([0-9]{4}) (\w+)
        keys:
          year: \1
          direction: \2

Running dpp with nonexistent dataset reference exists silently

In order to submit an issue, please ensure you can check the following. Thanks!

  • [x ] Declare which version of Python you are using (python --version)
  • [ x] Declare which operating system you are using
  • python 3.6.4
  • datapackage-pipelines 1.5.4
  • operating system: iinux (arch)

Running

GOBBLE_AUTH_TOKEN=mytoken dpp run ./2017-18/national/estimates-of-national-expenditure-south-africa-2017-18

outputs

INFO    :Main                            :Skipping redis connection, host:None, port:6379
INFO    :Main                            :RESULTS:

and exists with exit code zero giving no feedback that something went wrong or what went wrong.

dump.to_sql fails in sample pipeline

In order to submit an issue, please ensure you can check the following. Thanks!

* [ ] Declare which version of Python you are using (python --version): Docker

  • Declare which operating system you are using: macOS Sierra

From the ./samples/pipeline-spec.yaml

legend:datapackage-pipelines dan (master)$ docker run -v `pwd`:/pipelines:rw        frictionlessdata/datapackage-pipelines run ./samples/worldbank-co2-emissions
INFO    :Main                            :Skipping redis connection, host:None, port:6379
DEBUG   :Main                            :Using selector: EpollSelector
INFO    :Main                            :RUNNING ./samples/worldbank-co2-emissions
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_metadata.py
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_resource.py
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/stream_remote_resources.py
INFO    :Main                            :- /pipelines/samples/add_constant.py
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_zip.py
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_path.py
INFO    :Main                            :- /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py
INFO    :Main                            :dump.to_sql: Traceback (most recent call last):
INFO    :Main                            :dump.to_sql:   File "/usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py", line 2, in <module>
INFO    :Main                            :dump.to_sql:     from jsontableschema_sql import Storage
INFO    :Main                            :dump.to_sql: ImportError: No module named 'jsontableschema_sql'
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_sql.py: 1
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_metadata.py: -9
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_path.py: -9
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/add_resource.py: -9
ERROR   :Main                            :FAILED /pipelines/samples/add_constant.py: -9
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/dump/to_zip.py: -9
ERROR   :Main                            :FAILED /usr/local/lib/python3.5/site-packages/datapackage_pipelines/manager/../lib/stream_remote_resources.py: -9
INFO    :Main                            :RESULTS:
INFO    :Main                            :FAILURE: ./samples/worldbank-co2-emissions

Encoding problems when trying to dump.to_zip the remote dataset with non-tabular resource

If you try and dump.to_zip the remote dataset with a non-tabular resource with dpp, it will zip successfully, but if you try and open zipped resource it has encoding problems. Reproduce:

  • Run the following pipeline
remote:
  pipeline:
    -
      run: load_resource
      parameters:
        resource: 'vix-daily_json'
        url: "https://pkgstore-testing.datahub.io/core/finance-vix:vix-daily_json/datapackage.json"
        stream: False
    -
      run: dump.to_zip
      parameters:
          out-file: remote.zip
          force-format: False
          handle-non-tabular: True
  • Unzip
  • open data/vix-daily_json.json

Note: emphasizing remote here as if you try and download datapackage.json and data/vix-daily_json.json locally and zip - it works fine

See detailed debug here datopian/assembler#53 (comment)

Processor for adding field(s)

As a dpp user, I want to add a new column to my existing data with the result of various operations (like sum, avg, min, max, multiply, constant, join, format) performed on values of pre-defined columns of the same row, so that after running dpp run xxx output data has new column with value calculated for me.

Acceptance Criteria

  • able to add new column to data
  • able to perform operations like sum, avg, min, max, multiply, constant, join, format

Tasks

  • prepare specs
  • wtire tests
  • write processor add_fields
  • update README

Analysis

Spec:

run: add_fields
parameters:
  resources: resource_name
  fields:
    - operation: sum
      target: new_column_name
      nullas: 0 (default)
      type: any (default)
      source:
        - column1
        - column2
    - operation: sum
      target: another_column_name
      source:
        - column3
        - column1
    - operation: sum
      target: total_sum
      source:
        - new_column_name
        - another_column_name
    - operation: avg/min/max/multiply: same as for sum
    - operation: join
      source:
        - new_column_name
        - another_column_name
      with: ",'"
    - operation: format
      target: new_column_name
      with: '{column1} is better than {column2}'
    - operation: constant
      target: new_column_name
      with: 'my constant'

Available operations:

  • constant - add a constant value
  • sum - summed value for given column(s) in a row. skip None
  • avg - average value from given column(s) in a row. skip None
  • min - min value among given column(s) in a row. skip None
  • max - max value among given column(s) in a row. skip None
  • multiply - the product of given columns(s) in a row skip None
  • join - joins two or more column values in a row (separated by comma)
  • format - allows passing the formatted string. Eg ‘my name is {first_name}’

Rename this library

Context

Data Packages, and Frictionless Data specifications, are essentially part of the protocol and inner workings of this package, but the package itself does not really require knowledge of these specs. Branding it as "Data Packages" is misleading, and potentially would confuse users into thinking that a knowledge of data packages is required to use this package.

I think we should rename the package to simply pipelines or pipeline.

What are your thoughts @akariv

Any opinion on this @brew @roll @danfowler @vitorbaptista @amercader @rufuspollock ?

Albanian example in README doesn't work

So, I've tried to run Albanian example pipeline specified in README.

version: '2'
services:
  pipeline:
    build: .
    command: sh -c "cd /src && pwd && dpp"
    ports:
      - 5000:5000
    volumes:
      - ./:/src
pipeline_1  | Available Pipelines:
pipeline_1  | - ./albanian-treasury (E)
pipeline_1  | 	Unresolved processor: Couldn't resolve model at /src

Which processor I need to install additionally?

dump.to_sql in mode=update shouldn't do inserts

a pipeline with dump.to_sql mode=update

dump.to_sql: INFO    :Writing to DB cooperatives -> cooperatives (mode=update, keys=['id'])

throws exception about duplicate key when doing insert:

dump.to_sql: Traceback (most recent call last):
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
dump.to_sql:     context)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
dump.to_sql:     cursor.executemany(statement, parameters)
dump.to_sql: psycopg2.IntegrityError: duplicate key value violates unique constraint "cooperatives_pkey"
dump.to_sql: DETAIL:  Key (id)=(570000018) already exists.
dump.to_sql: The above exception was the direct cause of the following exception:
dump.to_sql: Traceback (most recent call last):
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/dump/to_sql.py", line 134, in 
dump.to_sql:     SQLDumper()()
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/lib/dump/dumper_base.py", line 40, in __call__
dump.to_sql:     self.stats)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/wrapper/wrapper.py", line 60, in spew
dump.to_sql:     for rec in res:
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/lib/dump/dumper_base.py", line 101, in row_counter
dump.to_sql:     for row in resource:
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/jsontableschema_sql/writer.py", line 57, in write
dump.to_sql:     for wr in self.__insert():
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/jsontableschema_sql/writer.py", line 75, in __insert
dump.to_sql:     statement.execute(self.__buffer)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/base.py", line 386, in execute
dump.to_sql:     return e._execute_clauseelement(self, multiparams, params)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
dump.to_sql:     compiled_sql, distilled_params
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
dump.to_sql:     context)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1402, in _handle_dbapi_exception
dump.to_sql:     exc_info
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
dump.to_sql:     reraise(type(exception), exception, tb=exc_tb, cause=cause)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
dump.to_sql:     raise value.with_traceback(tb)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
dump.to_sql:     context)
dump.to_sql:   File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
dump.to_sql:     cursor.executemany(statement, parameters)
dump.to_sql: sqlalchemy.exc.IntegrityError: (psycopg2.IntegrityError) duplicate key value violates unique constraint "cooperatives_pkey"
dump.to_sql: DETAIL:  Key (id)=(570000018) already exists.
dump.to_sql:  [SQL: 'INSERT INTO cooperatives (id, name, registration_date, phone, primary_type_id, primary_type, secondary_type_id, secondary_type, legal_status_id, legal_status, last_status_date, type, municipality_id, municipality, inspector, address) VALUES (%(id)s, %(name)s, %(registration_date)s, %(phone)s, %(primary_type_id)s, %(primary_type)s, %(secondary_type_id)s, %(secondary_type)s, %(legal_status_id)s, %(legal_status)s, %(last_status_date)s, %(type)s, %(municipality_id)s, %(municipality)s, %(inspector)s, %(address)s)'] [parameters: ({'id': '570000018', 'name': 'נחלת ישראל רמה אגודה שתופית בע"מ (במחיקה)', 'registration_date': datetime.datetime(1921, 2, 6, 0, 0), 'phone': '', 'primary_type_id': '43', 'primary_type': 'שיכון', 'secondary_type_id': '61', 'secondary_type': 'שיכון', 'legal_status_id': '23', 'legal_status': 'הודעה שניה על מחיקה', 'last_status_date': datetime.datetime(1970, 1, 29, 12, 0), 'type': 'התאחדות האיכרים', 'municipality_id': None, 'municipality': '', 'inspector': 'צוק חיים', 'address': 'דופקר ++++'}, {'id': '570000026', 'name': 'הלואה וחסכון ירושלים אגודה שיתופית בע"מ', 'registration_date': datetime.datetime(1921, 3, 24, 10, 25), 'phone': '02-6234432', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1921, 3, 24, 8, 13, 46), 'type': '', 'municipality_id': '3000', 'municipality': 'ירושלים', 'inspector': 'בן-חמו יוסף', 'address': 'קרן היסוד 41  ירושלים 94188 ת.ד:  2575'}, {'id': '570000034', 'name': 'הלואה וחיסכון זכרון יעקב אגודה הדדית בע"מ', 'registration_date': datetime.datetime(1921, 10, 16, 0, 0), 'phone': '', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '26', 'legal_status': 'אגודה מוזגה - מבוטלת', 'last_status_date': datetime.datetime(1977, 1, 29, 12, 0), 'type': '', 'municipality_id': '9300', 'municipality': 'זכרון יעקב', 'inspector': 'חליל יעקב', 'address': 'הנציב       זכרון יעקב     מיקוד ת.ד:  8'}, {'id': '570000042', 'name': 'קיבוץ איילת השחר', 'registration_date': datetime.datetime(1921, 12, 19, 12, 52), 'phone': '04-6932111', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '74', 'secondary_type': 'קיבוץ מתחדש', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1921, 12, 19, 13, 27, 17), 'type': 'תנועה קבוצית מאוחדת  תק"מ', 'municipality_id': '77', 'municipality': 'איילת השחר', 'inspector': 'מור טל', 'address': 'ד.נ. גליל עליון   איילת השחר 12200'}, {'id': '570000059', 'name': 'אגודה שיתופית לעזרה הדדית ברחובות בע"מ', 'registration_date': datetime.datetime(1921, 12, 19, 0, 0), 'phone': '', 'primary_type_id': '44', 'primary_type': 'אשראי וחסכון', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '25', 'legal_status': 'אגודה בוטלה  לאחר פירוק', 'last_status_date': datetime.datetime(1987, 5, 21, 12, 0), 'type': '', 'municipality_id': '8400', 'municipality': 'רחובות', 'inspector': 'יהודה משה', 'address': 'רחובות   רחובות'}, {'id': '570000067', 'name': 'הכפר העברי - אגודה הדדית בע"מ', 'registration_date': datetime.datetime(1922, 2, 2, 0, 0), 'phone': '02-5700756', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '54', 'secondary_type': 'אגודה חקלאית כללית', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1922, 2, 2, 12, 0), 'type': '', 'municipality_id': '3000', 'municipality': 'ירושלים', 'inspector': 'בן-חמו יוסף', 'address': 'נמצא אצל אבנאור שמואל, אביזהר, אחוזת בית הכ 8 כניסה: 247 ירושלים 96267'}, {'id': '570000075', 'name': 'החקלאית אג"ש לבטוח ולשרותים וטרינריים למקנה בישראל בעמ', 'registration_date': datetime.datetime(1922, 4, 11, 0, 0), 'phone': '04-6279600', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '57', 'secondary_type': 'ביטוח חקלאי', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1922, 4, 11, 12, 0), 'type': '', 'municipality_id': '1167', 'municipality': 'קיסריה', 'inspector': 'שרעבי מזל', 'address': 'הברקת 20  קיסריה 38900 ת.ד:  3039'}, {'id': '570000083', 'name': 'קופת מלוה חקלאית לגמ"ח אגודה שיתופית פתח תקוה בע"מ', 'registration_date': datetime.datetime(1922, 6, 29, 0, 0), 'phone': '', 'primary_type_id': '45', 'primary_type': 'תגמולים פנסיה ועזה"ד', 'secondary_type_id': '63', 'secondary_type': 'אשראי', 'legal_status_id': '25', 'legal_status': 'אגודה בוטלה  לאחר פירוק', 'last_status_date': datetime.datetime(1992, 12, 14, 12, 0), 'type': '', 'municipality_id': '7900', 'municipality': 'פתח תקוה', 'inspector': '', 'address': 'מונטיפיורי   14  פתח תקוה 49364'}  ... displaying 10 of 1001 total bound parameter sets ...  {'id': '570010009', 'name': 'הכורם הצעיר אגודה שיתופית חקלאית להספקת מים ברחובות בע"מ', 'registration_date': datetime.datetime(1950, 10, 23, 0, 0), 'phone': '08-9461817', 'primary_type_id': '40', 'primary_type': 'חקלאות', 'secondary_type_id': '56', 'secondary_type': 'אספקת מים', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1950, 10, 23, 12, 0), 'type': '', 'municipality_id': '8400', 'municipality': 'רחובות', 'inspector': 'בר-נתן יונה', 'address': 'הרצל 143  רחובות 76266'}, {'id': '570010017', 'name': 'מעונות עובדי קופת חולים ב\' בגבעתיים אגודה שיתופית בע"מ', 'registration_date': datetime.datetime(1950, 10, 23, 0, 0), 'phone': '03-6250814', 'primary_type_id': '43', 'primary_type': 'שיכון', 'secondary_type_id': '61', 'secondary_type': 'שיכון', 'legal_status_id': '10', 'legal_status': 'אגודה פעילה', 'last_status_date': datetime.datetime(1950, 10, 23, 12, 0), 'type': '', 'municipality_id': '6300', 'municipality': 'גבעתיים', 'inspector': 'בר-נתן יונה', 'address': 'נמצא אצל בוריס בריק, מצולות ים 14  גבעתיים 53486'})]
(sink): /usr/local/lib/python3.6/site-packages/jsontableschema/model.py:48: UserWarning: Class models.SchemaModel is deprecated [v0.7-v1)
(sink):   warnings.warn(message, UserWarning)

Test helpers for low-level processors

Basic processor testing is provided for in datapackage-pipelines (lib_test_helpers) using fixtures. But because the processor is run in a python subprocess, it's difficult to apply mocking for dynamic data such as dates, or the results of calls to external api services.

It would be useful if datapackage-pipelines provided the boilerplate for setting up a test that mocks ingest and spew, so processor code can be tested and relevant parts mocked out. So when our processor under test calls ingest it receives what we pass it in the test, and when it calls spew it populates a return values from test_processor that we can run asserts against.

@mock.patch('datetime.datetime')
test_my_great_processor(mock_date):
    mock_date.side_effect = datetime.datetime.now()
    ingest_tuple = (some_params, a_datapackage, [])

    spew_results = test_processor(great_processor, ingest_tuple)
    
    # assert against spew_results here

Unable to run a YAML pipeline where the resources are zipped datapackage files

Hi there

When attempting to stream zipped datapackage files as resources, I get an error stating the temp directory does not exist:
ERROR log from processor stream_remote_resources:
+--------
| Traceback (most recent call last):
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 212, in
| rows = stream_reader(resource, url, ignore_missing or url == "")
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 169, in stream_reader
| schema, headers, stream, close = get_opener(url, _resource)()
| File "/anaconda/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 154, in opener
| _stream.open()
| File "/anaconda/lib/python3.6/site-packages/tabulator/stream.py", line 169, in open
| source = tempfile.NamedTemporaryFile(suffix='.' + name)
| File "/anaconda/lib/python3.6/tempfile.py", line 549, in NamedTemporaryFile
| (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
| File "/anaconda/lib/python3.6/tempfile.py", line 260, in _mkstemp_inner
| fd = _os.open(file, flags, 0o600)
| FileNotFoundError: [Errno 2] No such file or directory: '/var/folders/hv/8kj9pgt513q6kx2h29yyrrfr0000gn/T/tmprg7n15qi.data/55e4d040-1491-4ef1-9d3d-66b05fae4280.csv'
+--------

Thanks a lot

  • [ Python 3.6.1 :: Anaconda 4.4.0 (x86_64)]
  • [ macOS High Sierra 10.13.2]

Perform print statement linting prior to running pipelines

In order to submit an issue, please ensure you can check the following. Thanks!

  • [3.6 ] Declare which version of Python you are using (python --version)
  • [ Ubuntu] Declare which operating system you are using

It would be greast if dpp showed an exception on import if a module contained print statements as this interferes with the standard out.

Would also be good to provide a logging example for a custom pipeline step and link the user to it if they try to print.

Fields 'doublequote' and 'skipinitialspace' in dialect not camelCased

here

            resource['dialect'] = dict(
                lineTerminator='\r\n',
                delimiter=',',
                doublequote=True,
                quoteChar='"',
                skipinitialspace=False
            )

Dialect:

{
  "delimiter": ";",
  "doubleQuote": false,
  "lineTerminator": "\n",
  "quoteChar": "'",
  "skipInitialSpace": false,
  "header": false
}

This breaks our R library for Data Packages: https://github.com/ropenscilabs/datapkg

In order to submit an issue, please ensure you can check the following. Thanks!

Python 3.6/macOS Sierra

Boilerplate functionality for processor modules

Overview

ingest and spew are awesome little functions, but I found there was still a lot of boilerplate to do for each processor. I suggest writing some additional wrapper code to deal with with that. I base my suggestions on use-cases I've encountered so far.

Assumptions

I assume that most of the time, you either want to process data row by row and/or mutate the datapackage. Let's talk about row processing first.

Objectives

For row by row processing, the wrapper code would fulfill 2 purposes:

  1. Provide boilerplate functionality

    • Log the processor parameters
    • Force the iteration over a given sample size (useful for debugging)
    • Log a sample output of the processor
    • Handle a chosen set of exceptions
    • Collect stats and process stats
  2. Pass context to the row processor

    • Manage parameters defaults and overrides
    • Pass the parameters to the row processor
    • Pass row and resource indexes to the processor

API

My first attempt at writing code for that resulted in the utility.process function. There's no stats functionality at this stage. The API looks like:

parameters, _, resources = ingest()
new_resources = process(resources, processor_function, parameter_key=value)
spew(resources, datapackage)

My second attempt (see code in progress) is a Processor class with an API along the lines of:

parameters, datapackage, resources = ingest()
processor = Processor(function=processor_function,
                      exceptions=(ValueError),
                      enumerate_rows=True,
                      sample_size=100,
                      datapackage=datapackage,
                      parameter_key1=default_value1,
                      parameter_key2=default_value2)
new_resources = processor.process(resources)
spew(resources, datapackage)

What I would really like to achieve is:

@row_processor
def my_awesome_processor(row):
    # do stuff
    return row, stats

And similarly add datapackage mutation like:

@datapackage_mutator
def my_awesome_mutator(datapackage):
    # do stuff
    return datapackage

@akariv care to comment?

Pass `compression` argument to tabulator.Stream

As a dpp user, I want to be able to run pipelines on compressed (zip, gz) tabular data (csv, xsl(x)), so that I do not have to decompress them first and run pipelines after.

Acceptance Criteria

  • able to run pipelines with resource URLs that do not end with .zip but are valud compressed tabular data (See analysis)

Tasks

Analysis

The above is completely possible if path or URL to my compressed file ends with correct extension Eg .zip as by default compression will be inferred from file name by the tabulator. but if url for my compressed file does not end with .zip pipelines will fail.

For example, I have two identical zip files:

If you create two identical pipeline-spec.yaml with a difference of only URLs from above and run them (Take a look this gist):

  • one with extension runs fine
  • one without extension fails.

Above is happening cause compression is not on the list of parameters to pass to tabulator: https://github.com/frictionlessdata/datapackage-pipelines/blob/master/datapackage_pipelines/lib/stream_remote_resources.py#L136

Provide a way to pass in the custom parser during add_resource so it can be used in stream_remote_resources

In order to submit an issue, please ensure you can check the following. Thanks!

  • [3.6 ] Declare which version of Python you are using (python --version)
  • [Ubuntu ] Declare which operating system you are using

Currently I have to create a sseparate task in order to use a custom parser like this:

https://github.com/strets123/frictionless-pres/blob/master/smdataproject/stream_remote_resources_custom.py

This breaks pep8

Add local resources via add_resource processor

As part of frictionlessdata/pilot-dm4t#21, I'm using datapackage-pipelines to read some large files I have stored locally. However, only remote resources are supported by the add_resource processor:

"You can only add remote resources using this processor"

https://github.com/frictionlessdata/datapackage-pipelines/blob/master/datapackage_pipelines/lib/add_resource.py#L11

To work around this, I am serving them via a local HTTP server. Is there any intention of supporting local resources in the future?

Processor to find and replace patterns in field(s)

As a dpp user, I want to be able to find some strings and replace/remove them, so I don't need to do it manually.

As a dpp user, I want to be able to replace certain patterns from field values so that I'm able to clean my data as I want.

For example one of the field in my data may have anchors for
footnotes:

year,country,number,
2000,XXX,12345
2001 (2),XXX,12345

Acceptance Criteria

  • I'm able to define one or more patterns to find and replace with
  • I'm able to define regex to find pattern

Tasks

  • write spec
  • new processor find_replace
  • tests
  • README

Analysis

Spec:



- 
  run: find_replace
  parameters:
    resources: as always
    fields:
      - 
        name: my_field
        patterns:
          -
            find: ([0-9]{4}) (\(\w+\))
            replace: \1
      - 
        name: my_second_field
        patterns:
          - 
            find: Q1
            replace: '03-31'
          - 
            find: Q2
            replace: '06-31'
          - 
            find: Q3
            replace: '09-30'
          - 
            find: Q4
            replace: '12-31'

Samples directory moved

README.md example references spec in current directory when it should reference spec in ./samples/ directory.

In order to submit an issue, please ensure you can check the following. Thanks!

* [ ] Declare which version of Python you are using (python --version): Docker

  • Declare which operating system you are using: macOS Sierra

Small bug in a documentation

Tasks

  • fix docs

Analysis

There is a example about unpivot feature:

unpivot:
    -
      name: ([0-9]{4}) (\\w+)  # regex for original column
      keys:
        year: \\1  # First member of group from above
        direction: \\2  # Second member of group from above

I'm not sure why is here double backslash \\1 in the readme, but when I use dpp in a pipeline processor with this example I got ERROR :Failed to cast row: Field "year" can't cast value "\1" for type "year" with format "default". With \1 everything is ok.

Please, somebody ping me to ensure that there is no special reasons to use double backslash \\ and I'll fix the doc.

Extra-documentation suggestion

Hi, I am new here, and like the ideia of use the pipeline-spec.yaml...

...But not understand, how to explain, to other people de difference between datapackage-pipelines and SQL (the most popular standard for express tabular-processing).

So, a suggestion is to include a document.md, or a section at README.md, showing for ceptics and dummies the basic differences... or perhaps a rationale, explaing why to not use SQL database export/import to the "declarative processing of tabular data".

Spawn processors in a pipeline for parallelism?

Description

@danfowler has recently been using DPP for some very large source files (8GB CSV). With the default way pipelines here work, processing this data via a single conceptual stream is too slow.

There are various ways to deal with this:

  1. Don't use streaming/DPP for such large files. Copy the data into a DB and use that.
  2. Try to use more efficient file backends for such files, such as HDF5, Feather, or mmapped files.
  3. Allow DPP to spawn multiple processing (sub-)pipelines, which end in a sink processor.
  4. ?

I'd like to explore option 3. @akariv what are your thoughts?

From my very high-level view of the framework, there are no inherent design barriers to this. The only thing I guess is that the descriptor object is mutable global state, and maybe the caching mechanisms too. This might mean that spawned processors should only work on the data sources, and not on metadata.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.