Code Monkey home page Code Monkey logo

ptero-workflow's Introduction

PTero Workflow Service

Build Status Coverage Status Requirements Status

This project provides web API for the PTero Workflow system of services. This system is designed to be a highly scalable replacement of the legacy Workflow system from The Genome Institute.

A prototype implementation, which does not provide an easy to use API (like this project does), easily handles our production workflows with tens of thousands of nodes. For reference, it can be found in two parts: core and workflow

The workflows are driven using an implementation of Petri nets with some extensions for color and token data.

The other existing components are: the petri core service and a forking shell command service.

Testing

The tests for this service depend on a running petri and forking shell command service. To run the tests, first install some tools:

pip install tox

Then setup the petri service and the shell-command service. In the parent directory:

git clone https://github.com/genome/ptero-petri.git
git clone https://github.com/genome/ptero-shell-command.git

And in the ptero-workflow directory:

ln -s ../ptero-petri
ln -s ../ptero-shell-command

Now, you can run the tests using tox:

tox -e py27

To see a coverage report after successfully running the tests:

coverage report

ptero-workflow's People

Contributors

davidlmorton avatar iferguson90 avatar mark-burnett avatar mkiwala avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ptero-workflow's Issues

Commands should specify their backend service per method

For now, let's try specifying the backend as a URL to post requests to. That means that services will have to conform to the interface that workflow expects.

A method entry for the fork service might look like:

{
    "name": "execute",
    "url": "http://localhost:6000/v1/jobs",
    "parameters": {
        "command_line": ["cat"],
        "max_runtime": 1
    }
}

Track and report errors from other services

By error, we mean that the service failed to perform its task, not that the job simply failed.

It may be useful to also implement a cron or Celery beat job that polls (on a very long time scale) external services to monitor jobs that have not reported back in some time.

Cleanup test reporting in run_tests

Currently duplicate tests names are displayed when running workflow tests. The list of tests names returned by nosetests should be made unique.

Also, say how many tests have been run.

Roundtrip tests are insufficient

They are not really checking that we get our original post data back, plus additional fields. They are also not recursive (I think).

Introduce Workflow to Schema Top-level

I'm opening this issue based on comments from #69.

On #69 @mark-burnett said:

I think that @davidlmorton is going to push for more symmetry between workflow and execution type methods, so that workflow looks more like this:

"workflow": {
  "type": "object",
  "properties": {
    "service": {
      "type": "string",
      "pattern": "DAG"
    },
    "parameters": {
       "type": "object",
       "properties": {
         "edges": { "$ref": "#/definitions/edgeList" },
         "tasks": { "$ref": "#/definitions/taskDictionary" }
       },
       "required": ["edges", "tasks"],
       "additionalProperties": false
    },
    "additionalProperties": false
  }
}

I also support this change, since increased symmetry is usually a good thing. I think it will encourage us to clarify the top level of this request document as well. Maybe moving to something like:

{
  "type": "object",
  "properties": {
    "workflow": { "$ref": "#/definitions/workflow" },
    "parallelBy": { "$ref": "#/definitions/name" },
    "inputs": { "$ref": "#/definitions/inputsDictionary" }
  },
  "required": ["workflow", "inputs"],
  "additionalProperties": "false"
}

How do / can we facilitate running a connected set of processes on one machine?

The idea is that you might want to run several tools in a row connected via named pipes to save on IO, and to unify checkpointing across the total result.

Some possible ways to approach this problem:

  • Have a new kind of task (not a method list) that is dedicated to this use case.
    • This may add a lot of responsibility to the workflow system, so might not be ideal.
  • Rely on the client wrapper/script to handle this (i.e. do nothing).
    • This may have some potential drawbacks WRT resource allocation for each job within the set of jobs.
    • This seems like it may be an appropriate solution, because the way in which you might approach this could be quite different depending on the execution environment (e.g. Hadoop vs LSF).

One obvious example is running two copies of bwa sampe and then sending them to bwa aln and then sending that result to a sorting tool. I'm sure there are other use-cases where one would want to do this.

Add endpoints for GETting step inputs and PUTting step outputs

This is a good step in generalizing our method concept so that workflow does not require modification to support additional services. When combined with a service registry in the auth service, we can simply inject various workflow urls for e.g. getting inputs to parameters in a submit request to a service. That should be pretty general an somewhat easy to integrate with other services.

Stop relying on pbr entry_points for reports

The Heroku python buildpack does not respect entry_points in the setup.cfg. Deis uses Heroku buildpacks and our deployment of Ptero to Deis depends on the Heroku python buildpack. So we cannot use entry_points in the setup.cfg for code that we expect to run on our Deis deployment of Ptero.

Currently the workflow reports implementation depends on entry_points in the setup.cfg. The implementation should be changed to remove the dependency on entry_points in the setup.cfg.

Support multiple ways of handling task failure

Allow the user to specify the answer to questions like:

  • What should the service do with currently running/scheduled jobs?
  • Should the service start new jobs that are not blocked by the failed task(s)?

Clean up operation types

  • rename "Model" to "DAG"
  • eliminate pass-through operation type (use command cat, instead)
  • modify DSL to not need the operation type to be specified

Add basic operation status support

This should be capable of:

  • reporting which method (shortcut/execute) is running/succeeded, etc.
  • reporting status for parallel_by commands

When get_split_size fails, fail the task and attach metadata to execution

The failure that prompted this issue occurred because get_split_size was called on an input that was a scalar instead of an array. Here is the error:

workflow[web.1]: (DataError) cannot get array length of a scalar
workflow[web.1]: 'SELECT json_array_length(result.data #> %(data_1)s) AS json_array_length_1 \nFROM result \nWHERE result.id = %(id_1)s' {'id_1': 396, 'data_1': u'{1}'}
workflow[web.1]: Traceback (most recent call last):
workflow[web.1]: File "/app/.heroku/src/ptero-common/ptero_common/logging_configuration.py", line 60, in wrapper
workflow[web.1]: File "/app/ptero_workflow/implementation/backend.py", line 110, in handle_task_callback
workflow[web.1]: return getattr(self, callback_type)(body_data, query_string_data)
workflow[web.1]: File "/app/ptero_workflow/implementation/models/result.py", line 31, in get_size
workflow[web.1]: tup = s.query(json_array_length(q)).filter_by(id=task.id).one()
workflow[web.1]: return self._execute_and_instances(context)
workflow[web.1]: return meth(self, multiparams, params)
workflow[web.1]: File "/app/.heroku/python/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 826, in _execute_clauseelement
workflow[web.1]: File "/app/.heroku/python/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 958, in _execute_context

Integrate with auth

It may be useful to cache some data about each service, such as their public keys.

Automate test of 'OutputsAlreadySet' error

We've hand-tested the code that disallows setting the outputs of an Execution more than once, but it required hacking the ptero_workflow_execution_wrapper. Figure out a way to automate that test that doesn't dig into the implementation too heavily.

Better error message when inputs missing

If you submit a workflow with a missing input it fails in an unclear way:

workflow[web.1]: 'NoneType' object is not iterable
workflow[web.1]: Traceback (most recent call last):
workflow[web.1]: File "/app/.heroku/src/ptero-common/ptero_common/logging_configuration.py", line 60, in wrapper
workflow[web.1]: result = target(_args, *_kwargs)
workflow[web.1]: File "/app/ptero_workflow/api/v1/views.py", line 22, in post
workflow[web.1]: rv = self.handle_user_exception(e)
workflow[web.1]: File "/app/.heroku/python/lib/python2.7/site-packages/flask_restful/init.py", line 258, in error_router
workflow[web.1]: return original_handler(e)
workflow[web.1]: File "/app/.heroku/python/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
workflow[web.1]: reraise(exc_type, exc_value, tb)
workflow[web.1]: File "/app/ptero_workflow/implementation/backend.py", line 24, in create_workflow
workflow[web.1]: workflow = self._save_workflow(workflow_data)
workflow[web.1]: File "/app/ptero_workflow/implementation/backend.py", line 66, in _save_workflow
workflow[web.1]: workflow.root_task.create_input_sources(self.session, [])
workflow[web.1]: File "/app/ptero_workflow/implementation/models/task/method_list.py", line 51, in create_input_sources

Address Postgresql growth

We need a way to DELETE a workflow from the database. Ideally we would be able to persist that data to disk. It could be implemented using the sqlalchemy orm models or directly in SQL.

See also issue 45.

support block and converge operation types

This allows anyone using PTero to utilize these features. It also ensures that we don't waste a shell-command worker or lsf slot on work that can be easily done inside the workflow service.

Fix Coverage

Coverage came back after merging #12, but it dropped from ~90% to ~50%. Did something happen with the repo move or #11 or #12?

Add MethodExecution Subclasses

In order to avoid potential data corruption issues, the MethodExecution subclasses will contain multiple columns instead of just a big 'data' JSON column.

Support blocking on spawned workflows

Utilize the environment variable PTERO_EXECUTION_URL to set a header field when submitting a workflow. The new workflow should use webhooks to alert its parent that it has completed. The method that was running when the workflow was 'spawned' should not complete until the spawned workflow has completed. If the spawned workflow fails, the method should fail. If the spawned workflow succeeds, its outputs should be used as the outputs of the method.

rabbitmq-plugins conflict when running multiple rabbits

This is technically not a workflow issue but it breaks tests in workflow.

A fresh install of rabbitmq with homebrew seems to enable several rabbitmq plugins that try to bind to a default port. When multiple rabbits spin-up, the plugins all try to bind to the same port, and the rabbitmq instance fails to boot.

I had to do the following before tests would run:

$ rabbitmq-plugins disable rabbitmq_stomp
$ rabbitmq-plugins disable rabbitmq_mqtt
$ rabbitmq-plugins disable rabbitmq_management

This seems to change some global rabbit configuration. A quick look over the rabbit documentation provides no obvious insight on how to disable plugins through configuration, environment variables, or command-line switches (as opposed to using rabbitmq-plugins).

Add support for submitting workflows with names

Top level workflows should be able to be named something relevant to the users. Its too hard to determine, just looking at the workflow itself, who cares about it.

I don't think we need them to be unique, but I would also not be opposed to it. If it is non-unique we could tack on something unique like the workflow_id.

FlexJob

The primary use case for this is to calculate LSF resource usage params based on the actual input data to a job. The syntax should probably look something like this:

{
  "tasks": {
    "A": {
      "methods": [
        {
          "service": "LSF",
          "parameters": {
            "commandLine": ["foo", "execute"]
          },
          "calculatedParameters": {
            "service": "ShellCommand",
            "parameters": {
              "commandLine": ["foo", "calculate-resources"]
            }
          }
        }
      ]
    },
    ...
  },
  "links": [
  ...
  ]
}

Celery tasks don't rollback postgres session

This may be of use:

http://celery.readthedocs.org/en/latest/userguide/tasks.html

Handlers
after_return(self, status, retval, task_id, args, kwargs, einfo)
Handler called after the task returns.

Parameters:
status – Current task state.
retval – Task return value/exception.
task_id – Unique id of the task.
args – Original arguments for the task that returned.
kwargs – Original keyword arguments for the task that returned.
einfo – ExceptionInfo instance, containing the traceback (if any).
The return value of this handler is ignored.

on_failure(self, exc, task_id, args, kwargs, einfo)
This is run by the worker when the task fails.

Parameters:
exc – The exception raised by the task.
task_id – Unique id of the failed task.
args – Original arguments for the task that failed.
kwargs – Original keyword arguments for the task that failed.
einfo – ExceptionInfo instance, containing the traceback.
The return value of this handler is ignored.

Add support for database schema migration

This is necessary before we release, because we need to be able to make changes over time.

The best apparent plan is to switch Django from flask, which has support for migration built-in.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.