Code Monkey home page Code Monkey logo

irods_tools_ingest's Introduction

irodsqueue

Instructions below are for testing the development code. Subject to change.

Requirements

  • *nix
  • Python 3.4+
  • Redis 4.0+. To download and install Redis see here
  • virtualenv

Installation

It is recommended to install this package in a virtual environment, e.g:

Make a virtual environment:

virtualenv -p python3 testenv3

Activate the virtual environment:

source testenv3/bin/activate

Install the latest PRC code from github:

pip install git+https://github.com/irods/python-irodsclient

Install this repository's current development branch:

pip install git+https://github.com/irods-contrib/irods_tools_ingest@dev

Usage

Start redis:

cd REDIS_DIR/src
./redis-server

From our virtual environment call the main app, e.g:

irodsqueue --help
irodsqueue ingest --help

Enqueue ingest jobs, for example to ingest a directory into iRODS:

irodsqueue ingest -f --timer /PATH/TO/LOCAL/DIR

Once we have jobs in the queue we can launch workers to process them. Each worker is its own process. We use a custom worker class that opens and maintains iRODS sessions. Open a separate terminal and activate our same virtual environment as above.

Test your iRODS connection:

ils

Launch 16 worker processes (e.g. in bash):

for i in {1..16}; do sleep .1; rq worker -v --burst -w irodsqueue.irodsworker.IrodsWorker & done

Options

You can specify a metadata extraction function to invoke as part of the ingest process, by using the --metadata option when enqueuing jobs. The value should be the path to a python file containing a function named extract_metadata. See examples

Enqueue jobs with metadata extraction:

irodsqueue ingest -f --extract-metadata ~/tests/metadata/test.py --timer /PATH/TO/LOCAL/DIR

Enqueue jobs with file checksums:

irodsqueue ingest -Kf --timer /PATH/TO/LOCAL/DIR

irods_tools_ingest's People

Contributors

adetorcy avatar trel avatar

Stargazers

 avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

irods_tools_ingest's Issues

new option for registration in place

need the equivalent of ireg to register existing data at rest.

possible flag...

  • -r for register only (preferred... but possibly confusing due to 'recursive')
  • -c for catalog only

also...

  • need to make sure the iRODS server is the same as the client machine.
  • need to ignore/complain about the -K flag (or seamlessly downgrade to only -k)

Tweaks required to run

Using python-irodsclient built from irods/python-irodsclient@f48f0fe

I required the following changes to get ingest.py to run:

Removed hard-coded password and replaced with password drawn from environment file:

from irods.password_obfuscation import decode

...

 with open(env['irods_authentication_file'], 'r') as authfile:
        pw = decode(authfile.read().rstrip('\n'))

The value of IRODS_ENVIRONMENT_FILE is ignored and a hard coded environment file path is used (meaning that I ran this on our production system by accident).

I also uncommented the raise statement which was trapping all exceptions in order to get a stracktrace.

python ingest, registration and sync tool

We have a requirement to ingest and / or register a considerable amount of data at rest in large, sometimes parallel, file systems. A design consideration is a new iRODS user with hundreds of millions of files, and also possibly an existing user who wishes to periodically sync a large volume with their existing iRODS catalog.

Given a target local directory, an initial list of features would include:

  • operates in parallel for all possible speed - recursively descend a file system and push fully qualified paths into a worker queue for ingest threads
  • option to wait N seconds to ensure file is at rest - landing zone style behavior
  • option to ingest files or just register in-place
  • option to checksum
  • option to provide regular expressions to skip
  • externalized metadata extraction using a DSL, inherited interface, or other mechanism for defining the rules to generate or extract metadata from the at-rest data
  • option to set iRODS ACLs after data is ingested
  • option for target collection, or collections given a mapping function
  • idempotent - ability to skip unchanged, properly ingested data and metadata

Other possible features:

  • proxy as other iRODS users for ingest

TypeError: open() takes 3 positional arguments but 4 were given

I followed the instructions and installed all the necessary components.
I start the ingest command:

$ ./irodsqueue ingest -f --timer /home/sharifi/irods-test/test.4

Then checked the queue that the job is there. Then I start the worker and it fails. Any idea what is going wrong here?

$ for i in {1..2}; do sleep .1; ./rq worker -v --burst -w irodsqueue.irodsworker.IrodsWorker & done

$ 09:44:47 Registering birth of worker irods2.29790
09:44:47 RQ worker 'rq:worker:irods2.29790' started, version 0.9.2
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Cleaning registries for queue: default
09:44:47 
09:44:47 *** Listening on default...
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 default: irodsqueue.utils.record_start_time(timestamp_id=1) (66ef5ff0-bff1-49ce-9d77-64880e45054a)
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 default: Job OK (66ef5ff0-bff1-49ce-9d77-64880e45054a)
09:44:47 Result is kept for 500 seconds
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 
09:44:47 *** Listening on default...
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 default: irodsqueue.utils.process_dir('/home/sharifi/irods-test/test.4', namespace(acl=False, chunk_size=8388608, destination='', dry_run=False, exclude='', extract_metadata=None, force=True, prefix='/home/sharifi/irods-test', 
register_checksum=False, source='/home/sharifi/irods-test/test.4', timer=True, verify_checksum=False)) (/home/sharifi/irods-test/test.4)
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 process 29790 creating /SURFsaraTest01/home/perf/test.4
09:44:47 Registering birth of worker irods2.29792
09:44:47 RQ worker 'rq:worker:irods2.29792' started, version 0.9.2
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Cleaning registries for queue: default
09:44:47 
09:44:47 *** Listening on default...
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:47 RQ worker 'rq:worker:irods2.29792' done, quitting
09:44:47 Registering death
09:44:48 default: Job OK (/home/sharifi/irods-test/test.4)
09:44:48 Result is kept for 500 seconds
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 
09:44:48 *** Listening on default...
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 default: irodsqueue.utils.process_file('/home/sharifi/irods-test/test.4/file.test.4.3', namespace(acl=False, chunk_size=8388608, destination='', dry_run=False, exclude='', extract_metadata=None, force=True, prefix='/home/sharif
i/irods-test', register_checksum=False, source='/home/sharifi/irods-test/test.4', timer=True, verify_checksum=False)) (/home/sharifi/irods-test/test.4/file.test.4.3)
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 660 seconds.
09:44:48 process 29790 uploading /SURFsaraTest01/home/perf/test.4/file.test.4.3
09:44:48 TypeError: open() takes 3 positional arguments but 4 were given
Traceback (most recent call last):
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/worker.py", line 771, in perform_job
    rv = job.perform()
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/job.py", line 558, in perform
    self._result = self._execute()
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/job.py", line 564, in _execute
    return self.func(*self.args, **self.kwargs)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 275, in process_file
    send_file(session, path, target, params)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 156, in send_file
    send_chunks(session, file_path, obj_path, params, options)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 164, in send_chunks
    with open(file_path, 'rb') as f, session.data_objects.open(obj_path, 'w', options) as o:
TypeError: open() takes 3 positional arguments but 4 were given
Traceback (most recent call last):
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/worker.py", line 771, in perform_job
    rv = job.perform()
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/job.py", line 558, in perform
    self._result = self._execute()
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/rq/job.py", line 564, in _execute
    return self.func(*self.args, **self.kwargs)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 275, in process_file
    send_file(session, path, target, params)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 156, in send_file
    send_chunks(session, file_path, obj_path, params, options)
  File "/usr/local/myVE/testenv3/lib/python3.6/site-packages/irodsqueue/utils.py", line 164, in send_chunks
    with open(file_path, 'rb') as f, session.data_objects.open(obj_path, 'w', options) as o:
TypeError: open() takes 3 positional arguments but 4 were given
09:44:48 Invoking exception handler <bound method Worker.move_to_failed_queue of <irodsqueue.irodsworker.IrodsWorker object at 0x7fb9c2cf88d0>>
09:44:48 Moving job to 'failed' queue
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
[....]


09:44:48 Invoking exception handler <bound method Worker.move_to_failed_queue of <irodsqueue.irodsworker.IrodsWorker object at 0x7fb9c2cf88d0>>
09:44:48 Moving job to 'failed' queue
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48
09:44:48 *** Listening on default...
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 Sent heartbeat to prevent worker timeout. Next one should arrive within 420 seconds.
09:44:48 RQ worker 'rq:worker:irods2.29790' done, quitting
09:44:48 Registering death

My redis queue

127.0.0.1:6379> flushall
OK
(0.81s)
127.0.0.1:6379> keys *
(empty list or set)
127.0.0.1:6379> keys *
 1) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.3"
 2) "rq:job:/home/sharifi/irods-test/test.4:dependents"
 3) "ingest_start_ts_id"
 4) "rq:queues"
 5) "rq:deferred:default"
 6) "rq:queue:default"
 7) "rq:job:/home/sharifi/irods-test/test.4"
 8) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.1:dependents"
 9) "rq:job:9c3231b2-a628-4dcf-8f80-7eb226f9e80a"
10) "rq:job:66ef5ff0-bff1-49ce-9d77-64880e45054a"
11) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.2"
12) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.1"

Then later

127.0.0.1:6379> keys *
 1) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.3"
 2) "rq:queues"
 3) "rq:deferred:default"
 4) "rq:worker:irods2.29790"
 5) "rq:worker:irods2.29792"
 6) "rq:job:66ef5ff0-bff1-49ce-9d77-64880e45054a"
 7) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.1"
 8) "ingest_start_ts_id"
 9) "rq:job:/home/sharifi/irods-test/test.4"
10) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.1:dependents"
11) "1"
12) "rq:finished:default"
13) "rq:job:9c3231b2-a628-4dcf-8f80-7eb226f9e80a"
14) "rq:queue:failed"
15) "rq:job:/home/sharifi/irods-test/test.4/file.test.4.2"

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.