Code Monkey home page Code Monkey logo

pyingest's Introduction

pyingest

A script for loading CSV and JSON files into a Neo4j database written in Python3. It performs well due to several factors:

  • Records are grouped into configurable-sized chunks before ingest
  • For CSV files, we leverage the optimized CSV parsing capabilities of the Pandas library
  • For JSON files, we use a streaming JSON parser (ijson) to avoid reading the entire document into memory

Installation

  • You will need to have Python 3 and compatible version of Pip installed.
  • Then run pip3 install -r requirements.txt to obtain dependencies
  • If you do not have a yaml module installed, you may need to run pip3 install pyyaml

Usage

python3 ingest.py <config>

The config is a YAML file described below

How it works

  • The configuration file is read into memory
  • Any pre_ingest cypher statements are run
  • File-based ingests (in the files stanza of config) are run
  • Any post_ingest cypher statements are run
  • NB - All values are read from the source file as strings. If you need a different type in the database, you should do the appropriate type conversion in the cypher ingest statement.

Configuration

The following parameters may be configured:

  • server_uri: Address of Neo4j driver (required)
  • admin_user: Username of Neo4j user (required)
  • admin_pass: Password for Neo4j user (required)
  • pre_ingest: List of cypher statements to be run before the file ingests
  • post_ingest: List of cypher statements to be run after the file ingests
  • files: Describes ingestion of a file - one stanza for each file. If a file needs to be processed more than once, just use it again in a separate stanza. Parameters for files are discussed below.

File ingest parameters

  • url: Path to the file (required)
  • cql: Cypher statement to be run (required)
  • chunk_size: Number of records to batch together for ingest (Default: 1000)
  • type: Type of file being ingested. (One of csv|json). This parameter is not required. If missing, the script will guess based on the file extension, defaulting to CSV.
  • field_separator: (CSV only) Character separating values in a line of CSV. (Default: ,)
  • compression: Type of compression used on file (One of gzip|zip|none) (Default: none)
  • skip_file: If true, do not process this file. (One of true|false) (Default: false)
  • skip_records: Skip specified number of records while ingesting the file. (Default: 0)

Additional info

The pyingest script is backed by an Integration Test suite written in Java that leverages the Neo4j test harness. Please see the javadoc on the IngestPyIT.java file for details about how this script is tested.

pyingest's People

Contributors

dependabot[bot] avatar mholford-neo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

pyingest's Issues

'Resilient' mode - move on from transient errors

Might be nice if pyi could log errors and continue. This would be for errors like:

  • Missing field leads to null key merge
  • Constraint violations
  • Date formatting bads
  • etc...
    Thinking there would be a command line flag (off by default). If enabled, would log the exception and put the offending chunk of records in a file so they can observed and/or rerun.

Config File and Order of Import

Is it possible to specify the order of the (CSV) files for import? I have them in a specific order within the files section, but they were not imported in the same order. Simply, of the files I specified, the ingestion process started with the 3rd file specified, went to the 4th, etc.

Perhaps there is a config that I am not leveraging, as I would expected to set the order of operations just as I could in a script using py2neo. Overall I really like this approach, but in a way where I can control the order of operations.

Support abbreviations in config

Most applicable would be to allow user to avoid specifying full path string in each file stanza.
Abbreviation keys would be defined dynamically in the main part of config and then applied to the file stanzas.
Abbreviation keys are of the form "abbrev.*" where everything in * is the key.
In the file stanzas, the key will be enclosed with #{}. What keys are found will be replaced with their values prior to execution of the file stanza.

Example config.yml:

abbrev.path: /home/matt/many/arbitrarily/long/sub/directories/that/we/want/to/avoid/listing/over/and/over/again
server_uri: localhost:7687
admin_user: ....
......
files:
-url: #{path}/file_to_ingest.csv
cql: ......

Allow user to specify basepath

User can specify basepath to avoid listing out long file paths in each file stanza.
The basepath can be referenced by $BASE in the url part of the file stanza.

Example config.yml:

basepath: /home/matt/very/long/and/arbitrary/span/of/subdirectories
server_uri: ...
.....
files:

  • url: $BASE/things.csv
    cql: ......

Error: query argument missing

I am getting the following error:

Traceback (most recent call last):
  File "pyingest/src/main/ingest.py", line 205, in <module>
    main()
  File "pyingest/src/main/ingest.py", line 196, in main
    server.pre_ingest()
  File "pyingest/src/main/ingest.py", line 150, in pre_ingest
    session.run(statement=statement)
TypeError: run() missing 1 required positional argument: 'query'

using this config.yml file (with the uri and password removed)

server_uri: 
admin_user: neo4j
admin_pass: 

files:
  - url: https://docs.google.com/spreadsheets/d/e/2PACX-1vT9-l5I0BhrBnx0zvG6HMUB_pmU-FudHU_Do-cmYv7TUGbAQOpImf8Xu1gxeTMgPEsHL5beW1691R1K/pub?gid=0&single=true&output=csv
    cql: |
      WITH $dict.rows as rows UNWIND rows as row
          MERGE (u:User {id:row.user})
          MERGE (s:Session {id:row.session})
          MERGE (u)-[:HAD]->(s)
          WITH u, row, s
          CALL apoc.create.node([row.type], {seq: row.seq, value: row.value}) YIELD node 
          CREATE (s)-[:ACTION]->(node)

pre_ingest:
  - "CREATE CONSTRAINT on (n:User) assert n.id is unique"
  - "CREATE CONSTRAINT on (n:Session) assert n.id is unique"

post_ingest:
  - "match (s)-[:ACTION]->(x)
      WITH s, x
      ORDER BY x.seq
      WITH s, collect(x) as actions
      CALL apoc.nodes.link(actions, 'NEXT')
      RETURN count(*)"


Perhaps it is something on my end, but my cypher commands work stand-alone prior to building the config file.

Feature Request: pypi package

A simple request to have this be a package available on pypi. Please do feel free to close this issue if this is not the appropriate forum or something you would prefer not to do at this time.

Potential Typo README

I love the foundation of this package, but to get up and running (on Google Colab), I needed to use

pip install -r requirements.txt

Simply, it appears that install is missing from the second bullet under installation.

Config Option to set the DB name

I run enterprise edition, a small feature would be the ability to set the name of the database in config file with a default to 'neo4j'.

Runs fast, awesome tool :)

Support for multi-database

User may enter a database name under the file stanza.
If present, this can be used by the Driver when creating a Session.
If no database is specified, it should default to whatever the DefaultDatabase is set to be on the Neo4j server.

Invalid input 'ON': expected "FOR" or "IF"

Hi, I'm using pyingest to import synthea csv file into neo4j database.
I'm using this project:
https://github.com/Neo4jSolutions/patient-journey-model/tree/master/ingest
Trying to figure out why it is giving me an error when creating indexes.
Requirements:
boto3==1.16.26
botocore==1.19.26
ijson==3.1.3
jmespath==0.10.0
neo4j==5.2.0
numpy==1.19.4
pandas==1.1.4
python-dateutil==2.8.1
pytz==2020.4
PyYAML==5.4
s3transfer==0.3.3
six==1.15.0
smart-open==4.0.1
urllib3==1.26.5

Command
python src/main/ingest.py '\patient-journey-model\ingest\config.yml'

Error

Traceback (most recent call last):
File "\Desktop\DS\Graph\Patient Graph\pyingest\src\main\ingest.py", line 214, in
main()
File "\Desktop\DS\Graph\Patient Graph\pyingest\src\main\ingest.py", line 205, in main
server.pre_ingest()
File "\Desktop\DS\Graph\Patient Graph\pyingest\src\main\ingest.py", line 157, in pre_ingest
session.run(statement)
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\work\session.py", line 291, in run
self._auto_result._run(
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\work\result.py", line 159, in _run
self._attach()
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\work\result.py", line 267, in _attach
self._connection.fetch_message()
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\io_common.py", line 180, in inner
func(*args, **kwargs)
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\io_bolt.py", line 655, in fetch_message
res = self._process_message(tag, fields)
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\io_bolt5.py", line 316, in _process_message
response.on_failure(summary_metadata or {})
File "\Anaconda3\envs\PatientGraph\lib\site-packages\neo4j_sync\io_common.py", line 247, in on_failure
raise Neo4jError.hydrate(**metadata)
neo4j.exceptions.CypherSyntaxError: {code: Neo.ClientError.Statement.SyntaxError} {message: Invalid input 'ON': expected "FOR" or "IF" (line 1, column 31 (offset: 30))
"CREATE INDEX EncounterIDIndex ON :Encounter(id)"
^}

Alternative way to select which files to run

It might be nice to be able to select which files to run via the command line. Here's a possible way:

  • The file stanzas are internally assigned a number sequentially starting from 1
  • User can also assign an "alias" to the file stanza via config
  • Add (-f) flag for files to run. The argument will be comma-separated list of aliases or numbers. If it's just numbers, ranges can be used. For example: python ingest.py -f people,places,things config.yml or python ingest.py -f 1-3,5,7 config.yml
  • Add (-F) flag for files not to run. Would work same way and run all files except those specified.
  • It would be illegal to have both -f and -F in the invocation
  • The skip_file parameter would continue to work as is. But it would be overridden by whatever is specified on the command line

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.