Code Monkey home page Code Monkey logo

pipelinewise's Introduction

Notice

To better serve Wise business and customer needs, the PipelineWise codebase needs to shrink. We have made the difficult decision that, going forward many components of PipelineWise will be removed or incorporated in the main repo. The last version before this decision is v0.64.1

We thank all in the open-source community, that over the past 6 years, have helped to make PipelineWise a robust product for heterogeneous replication of many many Terabytes, daily

PipelineWise

PyPI - Python Version License: Apache2

PipelineWise is a Data Pipeline Framework using the Singer.io specification to ingest and replicate data from various sources to various destinations. Documentation is available at https://transferwise.github.io/pipelinewise/

Logo

Table of Contents

Features

  • Built with ELT in mind: PipelineWise fits into the ELT landscape and is not a traditional ETL tool. PipelineWise aims to reproduce the data from the source to an Analytics-Data-Store in as close to the original format as possible. Some minor load time transformations are supported but complex mapping and joins have to be done in the Analytics-Data-Store to extract meaning.

  • Managed Schema Changes: When source data changes, PipelineWise detects the change and alters the schema in your Analytics-Data-Store automatically

  • Load time transformations: Ideal place to obfuscate, mask or filter sensitive data that should never be replicated in the Data Warehouse

  • YAML based configuration: Data pipelines are defined as YAML files, ensuring that the entire configuration is kept under version control

  • Lightweight: No daemons or database setup are required

  • Extensible: PipelineWise is using Singer.io compatible taps and target connectors. New connectors can be added to PipelineWise with relatively small effort

Official docker images

Pipelinewise images are published to: dockerhub

Pull image with:

docker pull transferwiseworkspace/pipelinewise:{tag}

Connectors

Tap extracts data from any source and write it to a standard stream in a JSON-based format, and target consumes data from taps and do something with it, like load it into a file, API or database

Type Name Extra Latest Version Description
Tap Postgres PyPI version Extracts data from PostgreSQL databases. Supporting Log-Based, Key-Based Incremental and Full Table replications
Tap MySQL PyPI version Extracts data from MySQL databases. Supporting Log-Based, Key-Based Incremental and Full Table replications
Tap Kafka PyPI version Extracts data from Kafka topics
Tap S3 CSV PyPI version Extracts data from S3 csv files (currently a fork of tap-s3-csv because we wanted to use our own auth method)
Tap Zendesk PyPI version Extracts data from Zendesk using OAuth and Key-Based incremental replications
Tap Snowflake PyPI version Extracts data from Snowflake databases. Supporting Key-Based Incremental and Full Table replications
Tap Salesforce PyPI version Extracts data from Salesforce database using BULK and REST extraction API with Key-Based incremental replications
Tap Jira PyPI version Extracts data from Atlassian Jira using Base auth or OAuth credentials
Tap MongoDB PyPI version Extracts data from MongoDB databases. Supporting Log-Based and Full Table replications
Tap Google Analytics Extra PyPI version Extracts data from Google Analytics
Tap Oracle Extra PyPI version Extracts data from Oracle databases. Supporting Log-Based, Key-Based Incremental and Full Table replications
Tap Zuora Extra PyPI version Extracts data from Zuora database using AQAA and REST extraction API with Key-Based incremental replications
Tap GitHub PyPI version Extracts data from GitHub API using Personal Access Token and Key-Based incremental replications
Tap Shopify Extra PyPI version Extracts data from Shopify API using Personal App API Password and date based incremental replications
Tap Slack PyPI version Extracts data from a Slack API using Bot User Token and Key-Based incremental replications
Tap Mixpanel PyPI version Extracts data from the Mixpanel API.
Tap Twilio PyPI version Extracts data from the Twilio API using OAuth and Key-Based incremental replications.
Target Postgres PyPI version Loads data from any tap into PostgreSQL database
Target Redshift PyPI version Loads data from any tap into Amazon Redshift Data Warehouse
Target Snowflake PyPI version Loads data from any tap into Snowflake Data Warehouse
Target S3 CSV PyPI version Uploads data from any tap to S3 in CSV format
Transform Field PyPI version Transforms fields from any tap and sends the results to any target. Recommended for data masking/ obfuscation

Note: Extra connectors are experimental connectors and written by community contributors. These connectors are not maintained regularly and not installed by default. To install the extra packages use the --connectors=all option when installing PipelineWise.

Running from docker

If you have Docker installed then using docker is the recommended and easiest method to start using PipelineWise.

Use official image

PipelineWise images are built on each release and available on Dockerhub

```sh
$ docker pull transferwiseworkspace/pipelinewise
```

Build your own docker image

  1. Build an executable docker image that has every required dependency and is isolated from your host system.

By default, the image will build with all connectors. In order to keep image size small, we strongly recommend you change it to just the connectors you need by supplying the --build-arg command:

```sh
$ docker build --build-arg connectors=tap-mysql,target-snowflake -t pipelinewise:latest .
```
  1. Once the image is ready, create an alias to the docker wrapper script:

    $ alias pipelinewise="$(PWD)/bin/pipelinewise-docker"
  2. Check if the installation was successful by running the pipelinewise status command:

    $ pipelinewise status
    
    Tap ID    Tap Type      Target ID     Target Type      Enabled    Status    Last Sync    Last Sync Result
    --------  ------------  ------------  ---------------  ---------  --------  -----------  ------------------
    0 pipeline(s)

You can run any pipelinewise command at this point. Tutorials to create and run pipelines is at creating pipelines.

Running tests:

Building from source

  1. Make sure that all dependencies are installed on your system:

    • Python 3.x
    • python3-dev
    • python3-venv
    • mongo-tools
    • mbuffer
  2. Run the Makefile that installs the PipelineWise CLI and all supported singer connectors into separate virtual environments:

    $ make pipelinewise  all_connectors

    Press Y to accept the license agreement of the required singer components. To automate the installation and accept every license agreement run:

    $ make pipelinewise all_connectors -e pw_acceptlicenses=y

    And to install only a specific list of singer connectors:

    $ make connectors -e pw_connector=<connector_1>,<connector_2>

    Run make or make -h to see the help for Makefile and all options.

  3. To start the CLI you need to activate the CLI virtual environment and set PIPELINEWISE_HOME environment variable:

    $ source {ACTUAL_ABSOLUTE_PATH}/.virtualenvs/pipelinewise/bin/activate
    $ export PIPELINEWISE_HOME={ACTUAL_ABSOLUTE_PATH}

    (The ACTUAL_ABSOLUTE_PATH differs on every system, running make -h prints the correct commands for CLI)

  4. Check if the installation was successful by running the pipelinewise status command:

    $ pipelinewise status
    
    Tap ID    Tap Type      Target ID     Target Type      Enabled    Status    Last Sync    Last Sync Result
    --------  ------------  ------------  ---------------  ---------  --------  -----------  ------------------
    0 pipeline(s)

You can run any pipelinewise command at this point. Tutorials to create and run pipelines can be found here: creating pipelines.

To run unit tests:

$ pytest --ignore tests/end_to_end

To run unit tests and generate code coverage:

$ coverage run -m pytest --ignore tests/end_to_end && coverage report

To generate code coverage HTML report.

$ coverage run -m pytest --ignore tests/end_to_end && coverage html -d coverage_html

Note: The HTML report will be generated in coverage_html/index.html

To run integration and end-to-end tests:

To run integration and end-to-end tests you need to use the Docker Development Environment. This will spin up a pre-configured PipelineWise project with pre-configured source and target databases in several docker containers which is required for the end-to-end test cases.

Developing with Docker

If you have Docker and Docker Compose installed, you can create a local development environment that includes not only the PipelineWise executables but also a pre-configured development project with some databases as source and targets for a more convenient development experience and to run integration and end-to-end tests.

For further instructions about setting up local development environment go to Test Project for Docker Development Environment.

Contribution

To add new taps and targets follow the instructions on

Links

License

Apache License Version 2.0

See LICENSE to see the full text.

Important Note:

PipelineWise as a standalone software is licensed under Apache License Version 2.0 but bundled components can use different licenses and may overwrite the terms and conditions detailed in Apache License Version 2.0. You can customise which connectors you want to include into the final PipelineWise build and the final license of your build depends on the included connectors. For further details please check the Licenses section in the documentation.

pipelinewise's People

Contributors

0xpetersatoshi avatar aairey avatar alastairstuart avatar amofakhar avatar ayhanaltunkaynak avatar bparlapalliwiley avatar dependabot[bot] avatar guy-adams avatar henriblancke avatar ivan-transferwise avatar jeet-parekh-wise avatar jmriego avatar judahrand avatar kasparg avatar koszti avatar louis-pie avatar marksuhhin avatar mashanm avatar nishant8887 avatar pedromartinsteenstrup avatar rafael-lima-tw avatar samira-el avatar saurabhjain2611 avatar snyk-bot avatar steptan avatar teaf-wise avatar tibortezsla avatar tolsto avatar tw-sec avatar vitorbaptista avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipelinewise's Issues

Error importing with encrypted password

Following the docs to encrypt a password, I can encrypt fine but when I try and import it doesnt seem to be able to find the secret file.

$ echo "password" > vault-password.txt

$ pipelinewise encrypt_string --secret vault-password.txt --string "rre^GGD$difvd43Hds" !vault | $ANSIBLE_VAULT;1.1;AES256.....

$ pipelinewise import --dir pipelinewise_live --secret vault-password.txt time=2020-04-24 14:02:52 logger_name=pipelinewise.cli.config log_level=INFO message=Searching YAML config files in /app/wrk time=2020-04-24 14:02:52 logger_name=pipelinewise.cli.config log_level=INFO message=LOADING TARGET: target_postgres.yml Traceback (most recent call last): File "/app/.virtualenvs/pipelinewise/bin/pipelinewise", line 11, in <module> load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')() File "/app/pipelinewise/cli/__init__.py", line 160, in main getattr(ppw_instance, args.command)() File "/app/pipelinewise/cli/pipelinewise.py", line 1161, in import_project config = Config.from_yamls(self.config_dir, self.args.dir, self.args.secret) File "/app/pipelinewise/cli/config.py", line 50, in from_yamls target_data = utils.load_yaml(os.path.join(yaml_dir, yaml_file), vault_secret) File "/app/pipelinewise/cli/utils.py", line 183, in load_yaml secret_file = get_file_vault_secret(filename=vault_secret, loader=DataLoader()) File "/app/.virtualenvs/pipelinewise/lib/python3.7/site-packages/ansible/parsing/vault/__init__.py", line 379, in get_file_vault_secret raise AnsibleError("The vault password file %s was not found" % this_path) ansible.errors.AnsibleError: The vault password file /app/wrk/vault-password.txt was not found

Support resume of FULL_TABLE replication method when it fails or is interrupted

FULL_TABLE replication is a problem for very large tables since must successfully complete every batch MERGE.

After a failure, the target has a partially populated table, but restarting the replication will start from the beginning and merge records that are already present. This can be a waste of time and money due to repeated attempts to load data that is already present (assuming of course the source data hasn't change between runs).

Since FULL_TABLE is used for the initial population of INCREMENTAL and LOG_BASED replication, this problem potentially effects all large table initial population.

It would be better if FULL_TABLE could resume from failure or interruption and continue where it left off.

LOG_BASED replication doing FAST_SYNC again after FULL_TABLE load.

I've loaded the data using the FULL_TABLE replication method. After I had changed the replication method to LOG_BASED. But It's started again with fasyt_sync. Is there any possibility to force LOG_BASED replication specifying the starting position and name of the binary log.

Make data type mappings for fast-sync and singer configurable

Thanks for the great tool, you're saving people a king's ransom vs fivetran pricing.

While I'm unfortunately not in a position to contribute, I would like to just put this feature suggestion out there. It would be useful to have a yaml input of data type mappings which is flexible and consistent across fast-sync and singer.

In my use case with mysql to snowflake for example tinyint(1) wasn't always boolean, so needed to be patched in both fast-sync and singer. Mediumint wasn't present in the list either.

batch_size_rows: 50,000 fails to copy last batch when performing log_initial

My initial tap-run of Oracle -> Snowflake is skipping the last batch of the initial load sync method (log_initial). I built a docker instance from master branch ( Oct 12, 2019 commit cdc92fc ). The symptom is that the target row counts are a multiple of the batch_size_rows: 50000 (which I increased from the default 20,000) and the last partial batch are missing. For example
TABLE ORACLE ROW COUNT SNOWFLAKE ROW COUNT
A 3,180,921 3,150,000
B 50,000 59,343
NOTE This is a pre-release version so might be a known defect.

When I reverted batch_size_rows to the default of 20,000 the problem went away.

Adding new connections

Hi,

This looks like a really interesting project, how do I look to add new sources/taps? Do I write standard singer source/tap?

MySQL version 8 incompatibility

While importing the tap configuration for MySQL 8.0.16 DB endpoint it's failing by the following error. We tried to update pymysql version but it didn't help. Please give your suggestions.

  File "/app/.virtualenvs/tap-mysql/bin/tap-mysql", line 8, in <module>
    sys.exit(main())
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/tap_mysql/__init__.py", line 712, in main
    raise exc
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/tap_mysql/__init__.py", line 709, in main
    main_impl()
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/tap_mysql/__init__.py", line 692, in main_impl
    log_server_params(mysql_conn)
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/tap_mysql/__init__.py", line 658, in log_server_params
    with connect_with_backoff(mysql_conn) as open_conn:
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/backoff/_sync.py", line 94, in retry
    ret = target(*args, **kwargs)
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/tap_mysql/connection.py", line 25, in connect_with_backoff
    connection.connect()
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/pymysql/connections.py", line 931, in connect
    self._get_server_information()
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/pymysql/connections.py", line 1269, in _get_server_information
    self.server_charset = charset_by_id(lang).name
  File "/app/.virtualenvs/tap-mysql/lib/python3.7/site-packages/pymysql/charset.py", line 38, in by_id
    return self._by_id[id]
KeyError: 255

psycopg2-binary instead of psycopg2

Hello,

I`m having this problem because in our system it's being painfully to install psycopg2, there is any major reason why not use psycopg2-binary in the tap and target?

Of course, I volunteer for doing the change, just want to understand if there is any other reason why use psycopg2 instead of psycopg2-binary.

Thanks,

Target-snowflake does not create schema and tables with key words

In target snowflake, we cannot create tables or schemas with keyword names
for example:
Schema name AS failed (created schemas AS AS)
Table name ORDER failed (Create table ORDER - ORDER is reserved key word in Snowflake)

there is a workaround for this and we have got it working - but needs to be tested fully with any other issues.

Properties.json corruption during parallel import

I recently found that pipelinewise couldn't import some taps due to either an empty or partial properties.json present from a previous import. Removing these properties.json and re-importing works.

I'm not sure of the root cause of why this happened, but it may have been due to a connection to a source failing on one thread during import which then causes all the threads to crash even while writing a partial properties.json.

Adding target BigQuery

First of all thanks for this amazing project. It really makes replication so much easier to do and to version control the configuration.

Is there any plans to add the BigQuery target?

I would be happy to add it myself but I'm having some issues understanding where should I make changes. I have tried just adding the target from singer and the requirements.txt as instructed in the Contributing part of the docs, but can't get it to work. I have problems with the data types and also the identifiers can't contain dashes.

Is there any plans to add it? If anyone could help me with this I'm happy to do it myself. Thanks!

Improve reporting of row count exported

exported_rows += export_batch_rows

The way row exports are reported is a little odd.

For tables where the row count is less than the batch size, it's already as:

2019-10-26 21:00:59 INFO: 2019-10-26 21:00:36 - Exported 20000 rows from siemenstestdb.departments...

Even if departments only has 10 rows.

For large tables, the log ticks out like:

2019-10-26 21:03:39 INFO: 2019-10-26 21:02:12 - Exported 100000 rows from siemenstestdb.employees...
2019-10-26 21:03:39 INFO: 2019-10-26 21:02:21 - Exported 120000 rows from siemenstestdb.employees...
2019-10-26 21:03:39 INFO: 2019-10-26 21:02:30 - Exported 140000 rows from siemenstestdb.employees...
2019-10-26 21:03:39 INFO: 2019-10-26 21:02:39 - Exported 160000 rows from siemenstestdb.employees...
2019-10-26 21:03:39 INFO: 2019-10-26 21:02:49 - Exported 180000 rows from siemenstestdb.employees...
2019-10-26 21:03:39 INFO: 2019-10-26 21:02:58 - Exported 200000 rows from siemenstestdb.employees...

But the final row (in this case 200000) is actually the top of the batch range - meaning that the actual number of rows is somewhere between 180000 and 200000.

I came across this while debugging some target tables that were empty and wanted to see if pipelinewise was actually pulling any rows (and how many).

Looking at the code, I see two options here:

#1 Change:
exported_rows += export_batch_rows
to
exported_rows += len(rows)

I've tested this and it works as expected, for small tables:

2019-10-26 21:48:24 INFO: 2019-10-26 21:48:20 -Exported  9 rows from siemenstestdb.departments...

and for large tables

2019-10-26 21:52:36 INFO: 2019-10-26 21:52:21 - Exported 20000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:22 - Exported 40000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:22 - Exported 60000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:23 - Exported 80000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:24 - Exported 100000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:25 - Exported 120000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:25 - Exported 140000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:26 - Exported 160000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:27 - Exported 180000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:28 - Exported 200000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:28 - Exported 220000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:29 - Exported 240000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:30 - Exported 260000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:31 - Exported 280000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:32 - Exported 300000 rows from siemenstestdb.dept_emp_latest_date...
2019-10-26 21:52:36 INFO: 2019-10-26 21:52:32 - Exported 300024 rows from siemenstestdb.dept_emp_latest_date...

Alternatively, to make it easier to parse the log output, change the wording for "interim" outputs and the "final" output e.g.

For small tables:

2019-10-26 21:58:06 INFO:                         ordinal_position
2019-10-26 21:58:06 INFO:
2019-10-26 21:58:06 INFO: 2019-10-26 21:58:02 - Exported total of 9 rows from siemenstestdb.departments...
2019-10-26 21:58:06 INFO: 2019-10-26 21:58:02 - MYSQL - Running query:

For large tables:

2019-10-26 22:02:50 INFO:
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:33 - Exporting batch from 0 to 20000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:34 - Exporting batch from 20000 to 40000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:34 - Exporting batch from 40000 to 60000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:35 - Exporting batch from 60000 to 80000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:36 - Exporting batch from 80000 to 100000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:37 - Exporting batch from 100000 to 120000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:38 - Exporting batch from 120000 to 140000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:39 - Exporting batch from 140000 to 160000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:39 - Exporting batch from 160000 to 180000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:40 - Exporting batch from 180000 to 200000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:41 - Exporting batch from 200000 to 220000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:42 - Exporting batch from 220000 to 240000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:43 - Exporting batch from 240000 to 260000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:43 - Exporting batch from 260000 to 280000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:44 - Exporting batch from 280000 to 300000 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:44 - Exported total of 300024 rows from siemenstestdb.current_dept_emp...
2019-10-26 22:02:50 INFO: 2019-10-26 22:02:44 - MYSQL - Running query:

I can make a pull request for either of these - which do you prefer? My preference is for the later one since if a user only cares about the final row counts it's easier to grep for "Exported total"

Singer MySQL -> Redshift unable to handle NOT NULL fields with empty string value

When doing a Singer sync from MySQL to Redshift, the job was failing because the MySQL column had a NOT NULL constraint with an empty string (''), which is obviously not null. Redshift has the same constraint, but empty string is outputted in CSV as an empty field (essentially indistinguishable from NULL) and Redshift fails the load.

Suggest solving this by dropping NOT NULL constraints on the target end or somehow distinguishing NULL from empty strings.

I had trouble working around this, as it seems the Redshift staging table that pipelinewise was loading into still had the NOT NULL constraint even if I manually changed the target table. I finally gave up and replaced the empty string in the source data with something else.

Why "compatible" taps and targets?

Hi,

I'm familiar with Singer, and I'm starting to explore PipelineWise.

Couldn't find any information about this in the documentation, so I thought I might ask: what does a tap or target need to support in order to make it "compatible" with PipelineWise?

Thanks!

MySQL taps white space in either table or column names breaks

Both of these are allowed in ANSI SQL.

To fix all MySQL queries need to have the table and column names correctly quoted. Similarly the writes into the target need to be similarly quoted. This should also fix #228 since the quoting required for allowing columns with names that are Reserved words (like Group).

I plan to issue a pull request for the fix for this.

ERROR: Command failed. Return code: 1

Getting error on running the command.
pipelinewise run_tap --tap postgres_sample --target mypostgres.

Error
2019-10-23 11:36:39 INFO: Running postgres_sample tap in mypostgres target
2019-10-23 11:36:39 INFO: No table available that needs to be sync by fastsync
2019-10-23 11:36:39 INFO: Table(s) selected to sync by singer: ['public-student']
2019-10-23 11:36:39 INFO: Writing output into /root/.pipelinewise/mypostgres/postgres_sample/log/mypostgres-postgres_sample-20191023_113639.singer.log
2019-10-23 11:36:40 ERROR: Command failed. Return code: 1
2019-10-23 11:36:40 INFO:

TAP RUN SUMMARY

Status  : FAILED
Runtime : 0:00:01.108270

pipelinewise status:-
Tap ID Tap Type Target ID Target Type Enabled Status Last Sync Last Sync Result


postgres_sample tap-postgres mypostgres target-postgres True ready 2019-10-23T11:36:39 failed
1 pipeline(s)

fast sync MySQL runs into deadlocks

I am trying to sync MySQL to Redshift and finding that fast sync fails due to deadlocks. We are pointing pipelinewise at a MySQL read replica of a fairly active production DB. Unfortunately, the pipelinewise job fails pretty consistently while trying to fast sync one of our larger tables.

Is there some way to disable fast sync and use traditional singer sync only? I couldn't find any way in the documentation, and I'm now crawling through the code looking for a way to comment it out as a test.

target NoneType issue with discover_tap

discover_tap crashs with specified tap and target. It seems that the run_post_import_tap_checks has a NoneType target..

command : discover_tap --tap test_mysql_mysql_mariadb --target my_test_snowflake
My tap and target exist and have the correct id specified in the yaml config.

Traceback (most recent call last): File "/app/.virtualenvs/pipelinewise/bin/pipelinewise", line 11, in <module> load_entry_point('pipelinewise', 'console_scripts', 'pipelinewise')() File "/app/pipelinewise/cli/__init__.py", line 109, in main getattr(pipelinewise, args.command)() File "/app/pipelinewise/cli/pipelinewise.py", line 630, in discover_tap post_import_errors = self._run_post_import_tap_checks(schema_with_diff, target) File "/app/pipelinewise/cli/pipelinewise.py", line 1229, in _run_post_import_tap_checks primary_key_required = target.get("primary_key_required", False) AttributeError: 'NoneType' object has no attribute 'get'

Allow for custom Singer catalogs

I guess in the context of Pipelinewise a Singer 'stream' is a 'table'. It would be nice to be able to edit the Pipelinewise 'tables' to not pull all columns. Is this something which could be supported?

Certain invalid characters break MySQL -> Redshift load due to Redshift error

I'm getting this error in stl_load_errors after a failed MyQL to Redshift fast sync.
Missing newline: Unexpected character 0x64 found at location 233

I'm already using these copy options (added ACCEPTINVCHARS), but it didn't help.
copy_options: "EMPTYASNULL BLANKSASNULL TRIMBLANKS TRUNCATECOLUMNS ACCEPTINVCHARS TIMEFORMAT 'auto'"

Looking at the character in question, it looks like a hacker was trying to do some code injection on our site, leading to this invalid character. I've obfuscated some data in my log output here, but position 233 is actually after the end of the raw_line returned by Redshift. I would have to go back to the raw S3 file to see what is actually there.

2737626,1434,2019-07-03,2,38,http://xxxxxxxxx.xxxxxxx.com/xxxxx/xxxx.php?id=1434&group=38&subid=/etc/passwd%00http://xxxxx.xxxxx.com/xxxxx/xxxxxxx.php?id=1434&group=38&subid=%2fetc%2fpasswd%00,"/etc/passwd

I'm not sure the best path here, but it seems like either having Pipelinewise filter out these characters during export from MySQL, or passing different options to Redshift so it can proceed more gracefully.

S3 Target - support profile based authentication

Dear Team,

Is it possible to implement AWS profile or assume role based authentication for S3 target.
Right now, it seems that it picks hard coded credentials.

aws_access_key_id: "<ACCESS_KEY>" # S3 - Plain string or vault encrypted
aws_secret_access_key: "<SECRET_ACCESS_KEY>" # S3 - Plain string or vault encrypted

What happens to history when re-importing configs ?

I would like some clarification on what happens when making changes to tap and target configs and re-importing them.

When re-importing config, when does any history, bookmarks etc get erased ? The reason I ask is when I execute a subsequent tap-run, it seems to spend a lot of time checking out the existing tables.

Redshift generates text fields that are too small

When syncing from MySQL to Redshift, a table with varchar >256 leads to a varchar(256) in Redshift, leading to errors in Redshift like:

Value too long for type character varying(256)

For example, my MySQL table has a varchar(500) but pipelinewise (really the underlying Singer target) only generates a varchar(256).

I was able to workaround this by manually changing the size to varchar(500).

Implement REST API

Hi all,

I was wondering if implementations of a API server into Pipelinewise had been considered? It would make hosting and interacting the software in distributed systems (private cloud) much easier.

My use case would involve putting Pipelinewise in Kubernetes. This would allow me to have a Multi-Server deployment (using a StatefulSet) without the need for NFS as the pods would be able to share a PersistentVolume.

I could then schedule Pipelinewise from Workflow Management tools such as Airflow or Argo Workflows.

This would be an AMAZING feature for many I'm sure!

fast sync MySQL -> Redshift outputs invalid dates

Great project! Hoping to get it working in production.

I'm having some issues with fast sync appearing to export the MySQL data with invalid dates. In this case, the original data is a timestamp - 2018-12-05 but Redshift is seeing 1985-00-05 in the input file.

Separately, I noticed the raw_line data from Redshift includes ~800 blank spaces at the end of the row following the last comma. You can't see it in my paste below, but I see them in a text editor. Not sure if that's expected behavior. I've seen that with other failed imports as well.

My investigation below:

Pipelinewise error:
Exceptions during table sync : ["mybiz.coregdata: Load into table 'coregdata_temp' failed. Check 'stl_load_errors' system table for details.\n", "mybiz.membersPregnancy: Load into table 'memberspregnancy_temp' failed. Check 'stl_load_errors' system table for details.\n", "mybiz.member_child: Load into table 'member_child_temp' failed. Check 'stl_load_errors' system table for details.\n", "mybiz.photos_flagged: Load into table 'photos_flagged_temp' failed. Check 'stl_load_errors' system table for details.\n"]

In Redshift stl_load_errors, I get:
colname: consumerbirthdate
type: timestamp
col_length:0
position: 112
raw_line:
172399,xxxxxxx,563752,,xxxxxx,xxxx'xxxxx,[email protected],000 xxxxxx xx.,,xxxxxxxxxx,xxx,--,,; ,2007-06-05,M,1985-00-05,2007-12-13 11:06:38,76.68.96.64,,,,xxxxxxxxxx,0.60,,,2020-05-11 05:50:25,2020-05-11 05:50:25,
raw_field_value: 1985-00-05
err_reason: Invalid data

In MySQL:
Query: select consumerbirthdate from coregdata where id = 172399
Result: 1984-12-05

Query: describe coregdata
Result:
Field: consumerbirthdate
Type: date

Separately, I have another error for the photos_flagged load, but I think it's just related to this error happening and failing the whole job? Not really sure - I have seen pairs of errors when one thing fails seeming to cause something unrelated to fail.
stl_load_errors
raw_line: 56121,8088123,0,".. - not shown here: 1005 trailing spaces
err_reason: Missing newline: Unexpected character 0xffffffbf found at location 18

Misleading tap-run log message : INFO Schema '<target-schema>' does not exist. Creating...

This is minor, but a bit annoying.

I get a misleading log message for a tap-run targeting Snowflake :

INFO Schema 'DEV_APPLYOL' does not exist. Creating... CREATE SCHEMA IF NOT EXISTS DEV_APPLYOL.

This always shows in each run-tap log.

The schema DOES exist and although the CREATE SCHEMA IF NOT EXISTS is benign, I would prefer if the assertion in the log was accurate.

Support for VIEW (instead of TABLE) as a tap source table for INCREMENTAL Replication Method

I have a source database table that does not have a primary key so can not be used for INCREMENTAL Replication Method.

However, I can create a VIEW in my database and JOIN to another table that does have a Primary key, making the VIEW suitable for INCREMENTAL Replication Method.

However when I try to use the view I get this error

INFO Selected streams: ['MYSCHEMA-TBLINSTITUTION', 'MYSCHEMA-VIEWINSTITUTION'] 
INFO Use of log_miner requires fetching current scn...
INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=X.X.X.X)(PORT=1521))(CONNECT_DATA=(SID=XXXX)))
INFO Getting catalog objects from information_schema cache table...
INFO End SCN: 924906321 
INFO No currently_syncing found
INFO Beginning sync of stream(MYSCHEMA-VIEWINSTITUTION) with sync method(incremental)
INFO Stream MYSCHEMA-VIEWINSTITUTION is using incremental replication with replication key CHANGEDDATE
INFO dsn: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=X.X.X.X)(PORT=1521))(CONNECT_DATA=(SID=XXXXX)))
INFO select SELECT  "ABBREVIATION" , "ADDRESS1" , "ADDRESS2" , "ADDRESS3" , "ADDRSTATE" ,to_char( "CHANGEDDATE" ), "CHANGEDUID" , "COUNTRYCODE" , "COURSESURL" ,to_char( "CREATEDDATE" ), "CREATEDUID"..." 
                                FROM MYSCHEMA.VIEWINSTITUTION
                               ORDER BY CHANGEDDATE ASC
                               
Traceback (most recent call last):
  File "/app/.virtualenvs/target-snowflake/bin/target-snowflake", line 11, in <module>
    load_entry_point('pipelinewise-target-snowflake==1.0.8', 'console_scripts', 'target-snowflake')()
  File "/app/.virtualenvs/target-snowflake/lib/python3.7/site-packages/target_snowflake/__init__.py", line 277, in main
    state = persist_lines(config, input, information_schema_cache)
  File "/app/.virtualenvs/target-snowflake/lib/python3.7/site-packages/target_snowflake/__init__.py", line 185, in persist_lines
    if config.get('primary_key_required', True) and len(o['key_properties']) == 0:
TypeError: object of type 'NoneType' has no len()
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 19, "tags": {}}
CRITICAL [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/app/.virtualenvs/tap-oracle/bin/tap-oracle", line 11, in <module>
    load_entry_point('pipelinewise-tap-oracle==1.0.0', 'console_scripts', 'tap-oracle')()
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 565, in main
    raise exc
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 562, in main
    main_impl()
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 556, in main_impl
    do_sync(conn_config, args.catalog, args.config.get('default_replication_method'), state)
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 533, in do_sync
    state = sync_traditional_stream(conn_config, stream, state, sync_method_lookup[stream.tap_stream_id], end_scn)
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 488, in sync_traditional_stream
    state = do_sync_incremental(conn_config, stream, state, desired_columns)
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/__init__.py", line 374, in do_sync_incremental
    state = incremental.sync_table(conn_config, stream, state, desired_columns)
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/tap_oracle/sync_strategies/incremental.py", line 91, in sync_table
    singer.write_message(record_message)
  File "/app/.virtualenvs/tap-oracle/lib/python3.7/site-packages/singer/messages.py", line 218, in write_message
    sys.stdout.flush()
BrokenPipeError: [Errno 32] Broken pipe
Exception ignored in: <_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>
BrokenPipeError: [Errno 32] Broken pipe

Failing to install from source on MacOS

I am having toubles installing the tool from source following the guidelines

Expected Behavior

The tool should complete the steps and successfully install

Actual Behavior

Executing ./install.sh --acceptlicenses results in:

...
Collecting PyMySQL==0.7.11 (from pipelinewise==0.10.1)
  Using cached https://files.pythonhosted.org/packages/c6/42/c54c280d8418039bd2f61284f99cb6d9e0eae80383fc72ceb6eac67855fe/PyMySQL-0.7.11-py2.py3-none-any.whl
Collecting psycopg2==2.8.2 (from pipelinewise==0.10.1)
  Using cached https://files.pythonhosted.org/packages/23/7e/93c325482c328619870b6cd09370f6dbe1148283daca65115cd63642e60f/psycopg2-2.8.2.tar.gz
    ERROR: Command errored out with exit status 1:
     command: /Users/rbassetto/babbel/repos/pipelinewise/.virtualenvs/pipelinewise/bin/python3 -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/private/var/folders/sh/gf_0zlrs6pl42pd9jg3yj8n80000gn/T/pip-install-vt2pyf42/psycopg2/setup.py'"'"'; __file__='"'"'/private/var/folders/sh/gf_0zlrs6pl42pd9jg3yj8n80000gn/T/pip-install-vt2pyf42/psycopg2/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base pip-egg-info
         cwd: /private/var/folders/sh/gf_0zlrs6pl42pd9jg3yj8n80000gn/T/pip-install-vt2pyf42/psycopg2/
    Complete output (23 lines):
    running egg_info
    creating pip-egg-info/psycopg2.egg-info
    writing pip-egg-info/psycopg2.egg-info/PKG-INFO
    writing dependency_links to pip-egg-info/psycopg2.egg-info/dependency_links.txt
    writing top-level names to pip-egg-info/psycopg2.egg-info/top_level.txt
    writing manifest file 'pip-egg-info/psycopg2.egg-info/SOURCES.txt'

    Error: pg_config executable not found.

    pg_config is required to build psycopg2 from source.  Please add the directory
    containing pg_config to the $PATH or specify the full executable path with the
    option:

        python setup.py build_ext --pg-config /path/to/pg_config build ...

    or with the pg_config option in 'setup.cfg'.

    If you prefer to avoid building psycopg2 from source, please install the PyPI
    'psycopg2-binary' package instead.

    For further information please check the 'doc/src/install.rst' file (also at
    <http://initd.org/psycopg/docs/install.html>).

    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

Steps to Reproduce the Problem

  1. clone the repository
  2. cd into the just cloned directory
  3. execute ./install.sh --acceptlicenses

Specifications

  • OS: MacOS Mojave Version 10.14.6

Let me know if you need any other context information.

mysql to snowflake fastsync doesn't work with IAM role

mysql_to_snowflake connector doesn't work with IAM roles. Fails with below error.

logger_name=pipelinewise.cli.pipelinewise log_level=INFO process_name=MainProcess file_name=pipelinewise.py line_no=(868) message=Exception: Config is missing required keys: ['aws_access_key_id', 'aws_secret_access_key']

Ungraceful Ctrl-C termination of tap-run keeps in running state.

Ungraceful termination. Doing Ctrl-C of run-tap does terminate but when I attempt to run the tap again it fails saying it is in a running state. I tracked down the cause being the last log file keeps its ".running" suffix (which I assume is used as a kind of lock file to prevent concurrent execution). Suggest graceful clean-up of log file to ".cancelled"

Pre-flight checks for tap-run performed during config import

Import should pre-flight check for config that will fail when run e.g. replication methods that require a primary key, or tables set to use the logging replication method that do not have logging enabled in the source db. I spent a lot of time understanding cause of failure being the need for a primary key in my source Oracle database.

Deleting rows with FULL_TABLE replication

I assumed that when using replication_method: "FULL_TABLE" (from Mysql > postgress), any rows deleted in source table would then be deleted in the target table. However it seems this is not the case. Is this the expected behaviour or a bug?

I have the column _sdc_deleted_at but its always null.

Looking at this old report their is mention of hard_delete/soft_delete but these related to log based replication.

Custom value for batch_size_rows is ignored

Describe the bug

Modifying the value of batch_size_rows does not actually modify the size of each batch on export.

Steps To Reproduce

  1. Modify the batch_size_rows
  2. Run tap

Expected behavior

Modifying the value of batch_size_rows should modify the size of each batch on export

Screenshots and log output

batch_size_rows: 20000 # Batch size for the stream to optimise load performance

self.connection_config['export_batch_rows'] = connection_config.get('export_batch_rows', 20000)

pipelinewise init creates directory with root user permission

Hi,

I'm using pipelinewise on a remote server. My code is running under a non-root user grants.
My issue is when pipelinewise init a new directory, it does it with the root access. Therefore my code can't edit or create yaml config files in this directory.

Screen Shot 2020-01-21 at 9 26 10 AM

Any solutions?

Thank you,

Dusty

Inheritable tap config

Another feature suggestion, although possibly doesn't need to be part of pipelinewise itself. But in a situation where there are say 50 of the same mysql dbs, the majority of the config (the list of tables in particular) could be inherited.

Yq v3 has the ability to merge the yaml files without mangling the rest of the file such as the encrypted_password e.g. like the below.

yq m -x ${PARTIAL_TAP_FILE} ${INHERITED_CONFIG} > /tmp/$FULL_TAP_FILE;

Dockerfile build broken

Some dependencies got messed up very recently (build worked fine a few days ago):

--------------------------------------------------------------------------
Installing target-snowflake connector...
--------------------------------------------------------------------------
....
pipelinewise-singer-python 1.1.1 has requirement pytz==2019.3, but you have pytz 2020.1.

Removing the snowflake connector solved the issue:
In setup.py under install_requires:

#'snowflake-connector-python==2.0.3',

And install.sh under DEFAULT_CONNECTORS:

#    target-snowflake

Support configuring a subset of table columns in tap schema

I have a need to only include a subset of columns in a database (Oracle) table.

For me, not including some columns is the best PII solution (ie. dont load the PII data)

I want to be able to select only a subset of table columns (ignore others) in the tap>schema>table config.

In the mean time I am using TRANSFORMATIONS to SET-NULL the column data, but I would prefer to not include them in the first place.

wildcard for table selection

I'm trying to use PipelineWise to transfer all tables in a specific schema in Postgres to Snowflake. I've got about 300 tables and I'm not sure how set that up in the yml config besides listing each one individually.

It would be nice to somehow "select all" tables under a schema. - table_name: "*" sounds convenient, but I'm sure it would get complicated quickly with needing exclude-lists and/or listing a few tables above the wildcard that need FULL TABLE replication. This would also let me automatically include new tables if they're added to the schema.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.