Code Monkey home page Code Monkey logo

simpleflow's Introduction

Simpleflow

PyPI - Version GitHub Actions Workflow Status

Simpleflow is a Python library that provides abstractions to write programs in the distributed dataflow paradigm. It coordinates the execution of distributed tasks with Amazon SWF.

It relies on futures to describe the dependencies between tasks. A Future object models the asynchronous execution of a computation that may end. It tries to mimic the interface of the Python concurrent.futures library.

Features

  • Provides a Future abstraction to define dependencies between tasks.
  • Define asynchronous tasks from callables.
  • Handle workflows with Amazon SWF.
  • Implement replay behavior like the Amazon Flow framework.
  • Handle retry of tasks that failed.
  • Automatically register decorated tasks.
  • Encodes/decodes large fields to S3 objects transparently (aka "jumbo fields").
  • Handle the completion of a decision with more than 100 tasks.
  • Provides a local executor to check a workflow without Amazon SWF (see simpleflow --local command).
  • Provides decider and activity worker process for execution with Amazon SWF.
  • Ships with the simpleflow command. simpleflow --help for more information about the commands it supports.

You can read more in the Features section of the documentation.

Overview

Please read and even run the demo script to have a quick glance of simpleflow commands. To run the demo you will need to start decider and activity worker processes.

Start a decider with:

$ simpleflow decider.start --domain TestDomain --task-list test examples.basic.BasicWorkflow

Start an activity worker with:

$ simpleflow worker.start --domain TestDomain --task-list quickstart

Then execute ./extras/demo.

More information

Read the main documentation at https://botify-labs.github.io/simpleflow/.

simpleflow's People

Contributors

agyar avatar ampelmann avatar anlandu avatar benjastudio avatar david-wobrock avatar di avatar ewjoachim avatar ggreg avatar jbbarth avatar monkeypac avatar mookerji avatar nstott avatar pcaneill avatar skygeo avatar virtualtam avatar ybastide avatar zborg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

simpleflow's Issues

Polling process crashed if it can't send heartbeat?

Hi,

It looks like if the activity polling process can't send a heartbeat for a single task, the whole polling process would crash:

Is this intended? Let's say if there is a network partition or some transient network issue to send heartbeat, is ignoring the error a better solution? In that case, the worker will continue polling for other tasks and the problematic task will have heartbeat timeout. The current behavior would lead to losing activity workers.

Thanks,
Kefu

Winter refactoring

Disclaimer: this is by no mean an issue to denigrate previous work on simpleflow. Simpleflow handles a ton of things, the code is globally nice and easy to work with thanks to solid conventions, and a good organization. But it grew up organically, like a lot of software, and it's becoming a pain to make some basic changes, so let's improve that!

Motivation: an example

I'm currently trying to make child workflows work within simpleflow. This involves naming child workflows correctly so we don't generate name collisions too easily (2 concurrent workflows can't have the same workflowId, else the second one won't launch).

One reliable way to do that would be to get the workflowId and runId of the parent workflow, hash them to a little string sufficiently unique, and then use that inside the child workflow name, among other things (workflow type + an increment as of today). Remember that we cannot use a random uuid() because the decider needs to stay stateless, while still naming things the same way across replays.

The PollForDecisionTask endpoint exposes those parameters, but they're not passed to simpleflow in the end where I need them (e.g. in simpleflow.swf.executor.Executor.replay().

Then the pain begins, because there are many levels of indirection from the API call via boto, and this function:

  • simpleflow.swf.executor.Executor.replay(history)
  • called by simpleflow.swf.process.decider.base.DeciderWorker.decide(history)
  • called by simpleflow.swf.process.decider.base.DeciderPoller.decide(history)
  • called by simpleflow.swf.process.decider.base.DeciderPoller.process(task) (where task is in fact a tuple of token + history)
  • called by simpleflow.swf.process.actor.Poller.start(..) with task deduced from a self._poll()
  • simpleflow.swf.process.actor.Poller._poll() actually delegates to Poller.poll(), which is abstract (which is OK, workers and deciders don't poll the same endpoint)
  • which leads us to simpleflow.swf.process.decider.base.DeciderPoller.poll(), which delegates to swf.actors.Decider.poll()
  • that last one uses indirectly boto's poll_for_decision_task() and returns a tuple token + history

So in order to get the workflowId + runId to simpleflow executor, we'd need to modify 8 functions, and either break their signature and return values, or deal with missing data at each level. This is not sane ihmo, and ironically, it makes that kind of tiny change a lot more difficult than if we were using the raw endpoints through boto.

Bonus: not simpleflow's problem, but we have private code at botify that relies on those functions at various levels, but we don't have an exhaustive test suite, so it will be hard no to break anything.

Proposals

I see a few actions that could improve the situation:

  • pass real objects and not raw data structures wherever possible on public methods: hence they will be transparently extensible without breaking compatibility or risking anything on calling functions ;
  • try to document input/return types: mea culpa on this one, Greg already started to do that in recent code, I need to improve my habits, and maybe find a better documentation format (doxygen's may be better?) ;
  • systematically expose everything in simple-workflow: all attributes must be exposed by default, constantly changing this is not sane ;
  • (maybe) keep a raw copy of SWF JSON result in higher-level objects: already done in some simpleflow objects, that allows for quick solutions where needed ; not sure about this one, favors hacks ;
  • merge simple-workflow and simpleflow: in recent work I had to change a lot of little things in simple-workflow and the distinction is sometimes hard to see ; class names are repeated, there are multiple actors that share the same properties but not all, etc. ;
  • improve tests coverage on both: that used to be super hard when dealing with processes or SWF calls ; now moto has basic support for SWF, so we should be able to tests things a bit more ; integration tests on processes will still be very difficult
  • (maybe) switch to a BDD-style test framework: I really enjoyed using sure while working on moto, it almost looks like rspec in ruby (not that good, but still nice)
  • decouple backends more clearly: the local executor is a nice thing to test, but it lacks support for many simpleflow features ; Greg already extracted SWF specific code in simpleflow/swf, but still, this is a real maintenance burden if we don't have better frontiers, because recent changes turned out to work differently on both executors, and needed different solutions (task naming, chain/groups, instance-based tasks)

Maybe starting from scratch on subcomponents would be simpler than trying to maintain strict compatibility (??).

I will complete this list ... next year. Any opinion welcome. If nobody has strong opinions about that, I will start implementing those ideas (not necessarily in this order) and will update this issue as I go.

simpleflow decider shouldn't share workflow definition instance

Today simpleflow decider shares workflow definition instances. It's particularly visible if you try the simpleflow standalone command with a workflow like that:

class MyWorkflow(Workflow):
    def __init__(self):
        self.items = []

    def run(self):
        self.items.append("foo")
        print self.items
        for i in [1, 2, 3, 4]:
            future = self.submit(task)
            future.wait()

This is a bit schematic, but the decider will have to replay the workflow 4 times and you will see that self.items grows bigger and bigger.

Of course this is a dumb example, but:

  • 1/ our previous decider module at Botify (private) didn't do that, it used to instantiate the workflow more lazily
  • 2/ it's really easy to grow a workflow with bad state with this and we should avoid that at all price (deciders should be stateless!)

For now we have fixed our internal workflow to avoid that, but I'll eventually fix simpleflow so it re-instantiate the workflow definition each time it starts a "replay".

Workflow history : ISO dates

Dates in workflow history doesn't have their timezone making it really hard to compare with other dates like in this example https://github.com/botify-hq/botify-workflow-dashboard/blob/master/test/utils/workflowFile/input-0.json where tasks and metrology steps aren't in the same timezone.

'scheduled_timestamp': event.timestamp,

"scheduled_timestamp": "2015-12-02T10:42:27.045000",
"started_timestamp": "2015-12-02T10:42:27.145000",
"completed_timestamp": "2015-12-02T10:47:01.593000",
"timed_out_timestamp":
"failed_timestamp"
"cancelled_timestamp"

BTW: timestamps are number of milliseconds since 1 jan 1970. So in the workflow history file, timestamps are actually datetimes and times are timestamps.

Don't run idempotent tasks multiple times

When we added idempotent tasks (#58) it was mainly a concern of not having tasks mixed up in workflows that have a variable geometry (where tasks are discovered in a non-deterministic order which varies from one replay to another).

But we didn't add any safeguard so that idempotent tasks would not be re-launched multiple times. So today if you try to schedule two identical idempotent tasks, two possibilities:

  • you're lucky: the 1st task already finished when the 2nd task is to be scheduled, and you will "just" re-run the task needlessly
  • you're not lucky: the 1st task is already scheduled or running ; when trying to schedule the second task SWF will return a ACTIVITY_ID_ALREADY_IN_USE error ; simpleflow being a bit dumb, the decider will take another decision and retry the exact same thing, and get the same error until the first time is finished.

We should make it so idempotent tasks are scheduled only once. And we should not break retries while doing that, obviously :)

Use concurrent.futures.Future

Per discussion with @ggreg , we notice that the semantic difference between our version of future (simpleflow.futures.Future) and python's future (concurrent.futures.Future) is mainly:

  1. behavior when blocking (access to result when computation is running)
  2. behavior of cancelling

However, these 2 points all come from the SWF executor. As we've separated backend executor (swf, local, local_async), these special future behavior should also be separated from the main interface.

It is proposed to:

  • introduce concurrent.futures.Future as the base Future definition.
  • could also be a sub-class of it to add behavior like map, see #7
  • created a SwfFuture that handles Swf specific cancelling and blocking strategy (blocking).

Fails to install on Python 3.5.1 and OS X

$ pip install simpleflow
Collecting simpleflow
  Using cached simpleflow-0.11.1.tar.gz
Requirement already satisfied (use --upgrade to upgrade): boto>=2.38.0 in ./.anyenv/pyenv/versions/3.5.1/envs/wp-scripts/lib/python3.5/site-packages (from simpleflow)
Collecting tabulate==0.7.3 (from simpleflow)
  Using cached tabulate-0.7.3.tar.gz
Requirement already satisfied (use --upgrade to upgrade): setproctitle in ./.anyenv/pyenv/versions/3.5.1/envs/wp-scripts/lib/python3.5/site-packages (from simpleflow)
Collecting subprocess32 (from simpleflow)
  Using cached subprocess32-3.2.7.tar.gz
    Complete output from command python setup.py egg_info:
    This backport is for Python 2.x only.

    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/cb/cxxqz_g94jd1x5tjwrklxgd00000gn/T/pip-build-22bmoshd/subprocess32/

Save failure details as JSON

On exception, worker processes save the Pythoncstr(exception) in the task's "reason" and traceback in "details". This is basically unusable. I suggest a more general format: repr(exception) in "message" and a JSON object with error, message and traceback keys in "details".

Allow to override a task's attributes

When calling Workflow.submit, the user should be able to set ad-hoc values for task_list, version, and various timeouts. This allow to adapt the task to the environment, for example, adjusting the timeouts to the value of a parameter such as the size of a collection.

swf.executor: improve the generation of a task's id

To find a task in the workflow's history, the executor assigns a unique id to the task. It increments the last id associated with the task's name:

class TaskRegistry(dict):
    """This registry tracks tasks and assign them an integer identifier.

    """
    def add(self, task):
        """
        ID's are assigned sequentially by incrementing an integer. They start
        from 0.

        :returns:
            :rtype: int. 

        """
        name = task.name
        self[name] = self.setdefault(name, 0) + 1

        return self[name]

However it does not work if the execution flow changes like in simple implementation of a retry on a timeout:

f_result = self.submit(f, a)  # task1
if f_result.finished and f_result.exception:
    if is_timeout(f_result.exception):
        f_result = self.submit(f, a)  # task1bis

f_result_2 = self.submit(f, b)  # task2
futures.wait(f_result, f_result_2)

Let's simulate the execution of this workflow:

In the first run, the executor:

  • Assigns id 1 to task1.
  • Assigns id 2 to task2.
  • Blocks on futures.wait().
  • Schedules task1 with id 1 and task2 with id 2.

In the second run:

  • task1 timed out.
  • task2 finished at the same time.
  • Executor assigns id 1 to task1.
  • Both conditions are true and executor assigns id 2 to task1bis and finds it in the history. It mistakenly put the result of task2 in task1bis (they are not even called with the same argument).
  • Executor assigns id 3 to task2 and does not find it in the history.
  • Executor blocks on futures.wait()

The issue there is that the id of a task is generated with respect to the code that is evaluated before and this code may change depending on the execution of the workflow.

Assuming that tasks are idempotent would make things easier, because the executor would only need to hash the name and input of a task to generate its id. However if tasks can have side-effects (and we cannot ensure they cannot), we need to provide another mechanism. I thought that with could mark a computation as idempotent with a cached or memoized attribute. But we still need to handle other computations.

Support instance-based activities

This has been discussed a little in #70 , but I open a separate issue for this. We'd like to be able to pass an object instance to Workflow.submit() directly to ease its instrumentation:

self.submit(Foo(bar))

The recent refactoring around activity tasks could help with that.

For now I have something working here: https://github.com/botify-labs/simpleflow/tree/feature/instance-based-activities but I'm not really satisfied (one more class concept, hacky generation of an Activity, "callable" still used everywhere where in fact we have an object, no automatic args/kwargs resolution, etc.). Will try to improve that later.

More, better commands

  • workflow.filter: filter on status, tags, workflow...
  • task.info: didn't work on failed tasks

And applied some flake8.

Execute `Workflow.after_run` even if the workflow is not finished

I need to access to the workflow history at the end of the run even if the workflow execution is not finished.

I suggest to call after_run not only when the workflow is finished and to add an additional argument finished (boolean) to indicate if the workflow is finished.

Handle when an activity type does not exist

In this case, the executor has to create the activity type and let the replay behavior try to schedule the activity again.

This allows to remove the .get_or_create() calls from ActivityTask.schedule() and to only query SWF when needed. It also permit to run all the tests without AWS API keys.

AttributeError when "identity" is not provided to ActivityWorker

I thought the identity argument to swf.actors.worker.ActivityWorker's initializer and/or its poll method was optional, particularly since it has a default value of None. However, leaving it out causes an AttributeError.

  File "/usr/local/lib/python2.7/dist-packages/simpleflow/history.py", line 66, in parse_activity_event
    activity['identity'] = event.identity
AttributeError: 'ActivityTaskEvent' object has no attribute 'identity'

Provide an interface to chain computations

A Future could be a functor (in scala.concurrent.future it is even a monad) and provide the operation map :: f a -> (a -> b) -> f b (fmap :: (a -> b) -> f a -> f b in Haskell) where the first argument is self (an instance of Future). If we translate the type signature to our future case, map :: Future a -> (a -> b) -> Future b.

Let take a simple example:

y = self.submit(comp1, x)
z = self.submit(comp2, y)

where:

  • comp1 returns a dict {str: str}
  • comp2 takes a single argument of type str

We cannot pass y.result to comp2. We need to pass y.result['key']:

y.map(lambda result: result['key']

where:

  • y has the type Future dict
  • lambda result: result['key'] has the type dict -> str
  • f is Future, a is dict, and b is str

This kind of .map method is not usual in Python. We could borrow some interfaces from the JavaScript world such as .then() in Q.

A monad interface would ease chaining and the handling of failures.

SWF: resume a the execution of a stopped workflow

Description

This feature allows to resume a workflow where it failed, timed out or was terminated or cancelled. That what we mean by stopped.

Hence the tasks that were completed on this execution are not executed again.

This feature introduces the following problems:

  • Interface: How to request this behavior? In other word, how does the decider know that it should resume a workflow?
  • Implementation: How to inject the history of the stopped workflow?

Interface

There are several ways to communicate with a decider:

  • Through a special task list. It is not convenient because it has a impact on the configuration and the implementation of the decider.
  • With special values in the workflow's input, for example prefixed with _. As the client passes input in a dict that contains the args and kwargs keys, we could also add a mode key. Mode is not explicit and requires additional parameters to reference the workflow execution to resume. I prefer a _previous_workflow_execution parameter with the two attributes workflow_id and rund_id that would allow to retrieve the history of the previous workflow execution.
    Attach images by dragging & dropping, selecting them, or pasting from the clipboard.
    Update commentCancel

Implementation

Overview

The two main problems are:

  1. To not execute again the tasks that were already completed. Their state is stored in the history of the workflow execution that was stopped.
  2. To use the input of the workflow execution that was stopped.

How to not execute already completed tasks

We could:

  • Retrieve the history of the current workflow execution, parse it, and merge the events to have a snapshot of all tasks state.
  • Retrieve the history of the stopped workflow execution, parse it, merge the events, and only keep the completed tasks.
  • Then we merge these tasks into the current state of tasks to override tasks that were not completed or even scheduled.
  • When future objects are filled, ones that back the already completed tasks are in state FINISHED (failed tasks are discarded and must be executed again).

This approach requires that tasks id are consistent between the previous and the current workflow_execution because this id is used to associate a future object with the task it backs.

How to use the input of the stopped workflow execution

Once we get a reference to the previous workflow with the _previous_workflow_execution parameter, we can retrieve its input and inject it into the current workflow history.

Fix pypy build that fails on test_workflow_with_child_workflow

Fails on:

=================================== FAILURES ===================================
______________________ test_workflow_with_child_workflow _______________________
    def test_workflow_with_child_workflow():
        workflow = TestDefinitionChildWorkflow
        executor = Executor(DOMAIN, workflow)

        history = builder.History(workflow,
                                  input={'args': (1,)})

        # The executor should schedule the execution of a child workflow.
        decisions, _ = executor.replay(history)
        assert len(decisions) == 1
        assert decisions == [{
            'startChildWorkflowExecutionDecisionAttributes': {
                'workflowId': 'workflow-test_workflow-1',
                'taskList': {
                    'name': 'test_task_list'
                },
                'executionStartToCloseTimeout': '3600',
                'input': '{"args": [1], "kwargs": {}}',
                'workflowType': {
                    'version': 'test_version',
                    'name': 'test_workflow'
                },
                'taskStartToCloseTimeout': '300'
            },
            'decisionType': 'StartChildWorkflowExecution'
        }]

        # Let's add the child workflow to the history to simulate its completion.
        (history
            .add_decision_task()
            .add_child_workflow(
                workflow,
                workflow_id='workflow-test_workflow-1',
                task_list=TestWorkflow.task_list,
                input='"{\\"args\\": [1], \\"kwargs\\": {}}"',
                result='4'))

        # Now the child workflow is finished and the executor should complete the
        # workflow.
        decisions, _ = executor.replay(history)
        workflow_completed = swf.models.decision.WorkflowExecutionDecision()
        workflow_completed.complete(result=json.dumps(4))

>       assert decisions[0] == workflow_completed
E       assert {'decisionTyp...st_version'}}} == {'decisionType...result': '4'}}
E         Detailed information truncated, use "-vv" to show
tests/test_dataflow.py:643: AssertionError
=============== 1 failed, 31 passed, 15 warnings in 3.87 seconds ===============
The command "py.test" exited with 1.
Done. Your build exited with 1.

Stagger process starts

Somehow related to #108, low urgency: when starting a simpleflow process (Worker, Decider) we start everything at once. Here at Botify we usually have 8 subprocesses for each master process, and we have many activity workers (sometimes > 50), each having multiple master processes. Numbers add up quickly, so maybe we should stagger process start inside a given master process (say, start only one child process each second).

Low priority, just a thought for later.

Add Workflow.cancel()

It requests the cancelling of the workflow. It then can trigger an .on_cancellation() callback.

Add a raises_on_failure flag and propagate cancelling in canvas

It would be nice to have a raises_on_failure flag for canvas nodes that can break/cancel a whole branch of a canvas tree.

This flag should be propagated recursively (BFS) to children until the next flagged child is found (on create/init).

Then, the behaviour of the nodes should be:

  • When an activity (leaf) flagged as True fails, it propagates a failure message to its direct parent.
  • When a node is informed by a failure it propagates the information to its own children (except the one from it receives the message), and pass it to its direct parent (except if this one is flagged as False).
  • When an activity (leaf) receives a failure message it considers it as an cancel order. (Or the order can be explicitly be asked by its parent..).

It looks simple, but please let me know your thoughts.

boto: Bad Request when using --tags

How to reproduce

I try to use --tags with the following command based on the README.md.

simpleflow standalone --domain TestDomain examples.basic.BasicWorkflow --input '[1, 5]' --tags tag1,tag2

What I get

Traceback (most recent call last):
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/bin/simpleflow", line 11, in <module>
    load_entry_point('simpleflow', 'console_scripts', 'simpleflow')()
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 716, in __call__
    return self.main(*args, **kwargs)
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 696, in main
    rv = self.invoke(ctx)
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 1060, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 889, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 534, in invoke
    return callback(*args, **kwargs)
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/Users/frncmx/.prezi/simpleflow/simpleflow/command.py", line 502, in standalone
    local=False,
  File "/Users/frncmx/.prezi/simpleflow/simpleflow/command.py", line 161, in start_workflow
    decision_tasks_timeout=decision_tasks_timeout,
  File "/Users/frncmx/.prezi/simpleflow/swf/models/workflow.py", line 270, in start_execution
    task_start_to_close_timeout=decision_tasks_timeout,
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 477, in start_workflow_execution
    'taskStartToCloseTimeout': task_start_to_close_timeout,
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 118, in json_request
    return self.make_request(action, json_input, object_hook)
  File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 145, in make_request
    raise excp_cls(response.status, response.reason, body=json_body)
boto.exception.SWFResponseError: SWFResponseError: 400 Bad Request
{u'Message': u'Expected list or null', u'__type': u'com.amazon.coral.service#SerializationException'}

Root cause

boto excepts a list but WorkflowType::start_execution() does not process tag_list provided through --tags.

Proposed solution

The AWS CLI manual defines the following syntax for --tag-list option:

--tag-list "string" "string" ...

But they only consume option arguments, there is no any positional arg.

So I think we could only have something like:

--tags "string","string", ...

In the case above the following code would be enough to add in the start_execution function:

if tag_list is not None:
            tag_list = tag_list.split(',')

simpleflow.execute: Not all errrors are returned base64 encoded

An OOM for instance won't be caught by try ... except. The wrapper then returns a bogus EOFError.

Testcase without OOM 😇:

# zz.py
import os
from simpleflow import execute

@execute.python()
def stupid():
    pid = os.getpid()
    os.kill(pid, 9)
python
Python 2.7.6 (default, Mar 22 2014, 22:59:56) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zz
>>> zz.stupid()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/execute.py", line 157, in execute
    base64.b64decode(excline.rstrip()))
EOFError
>>> 

Manage computation nodes

While adding the decider and activity worker processes, I was thinking that anything that could help to bootstrap the execution of a workflow would be a great improvement. It led to the addition of simpleflow commands to start, track, and stop an execution. One of the most helpful command to define and test a workflow is simpleflow standalone. In a single command, one can execute the whole workflow by combining decider and activity processes below a single process. However this command does not scale for a distributed deployment.

The purpose of the feature described here is to provide an interface to manage computing resources. The main artifact is the node which refer to a machine inside a group of machine. Speaking of a cluster may suggest all nodes are the same, or at least similar. However we want to be able to support different types of computation and to easily provides the right resources: CPU, memory, storage and network.

A node requires a snapshot of its initial state, commonly called an image. In AWS parlance we will refer to an AMI which is the filesystem snapshot used to boot an EC2 instance.

Though there are libraries that abstract the specific cloud hosting provider such as libcloud, I prefer to start to experiment with Amazon EC2. It does not mean I want to hard code everything. The EC2 backend will registered itself as part of several hosting backends. At first, it will be the only one, but feel free to add any other backend you need or like.

How to use this interface? After reading the paragraphs above you may be thinking it's little abstract. Let's take a concrete example on how to use it.

To execute a workflow, we need three processes:

  • A decider
  • An activity worker
  • A client to start the execution

All these processes are stateless. The decider and activity worker are polling tasks. It means they do not need to be running when the client starts a workflow or SWF triggers an event. We could then leverage this behavior to create a group of processes and their underlying supporting machine on-demand:

  • The client ask for a decider
  • If there is no instance running a decider process, it creates an EC2 instance
  • The client starts the workflow execution
  • A decider takes the decision tasks and executes the workflow definition. This definition requires activity workers and instanciates them. It has to map activities to their corresponding task list. How to know when to terminate the instances? The simple way is to wait for the termination of the workflow execution. With this way there is no risk to terminate an instance while it is still needed for an incoming activity task. However it does not efficiently uses resources. EC2 bills by hour. We should consider the period arbitrary. What matters is if we are within the period i.e. the activity completed before an hour, or beyond. Wherever we start using a new hour, it is better to continue using it for incoming activity tasks. The instance should be terminated when no activity will be scheduled.

Kill worker when the activity task is closed

Today when a workflow execution is closed (completed, failed or timed out) or when an activity task has timed out, we stop heartbeating but the worker continue its work.

In some cases that might be a good idea:

  • 1/ we want the activity to finish so it can cleanup after itself,
  • 2/ if the task is 99% done, maybe we want it to finish anyway (dubious?)

Anyway in most use cases at Botify this is counter-intuitive and as simpleflow workers are limited in number, we can quickly reach a point where all workers are busy working for tasks that won't complete anyway on the SWF side, leaving new workflows without enough workers.

Hence, we should change how simpleflow handles closed activity tasks (actually: UnknownResource errors when sending a heartbeat):

  • MVP: kill the worker when this case happen
  • put this behing a feature flag ; per activity task probably ; we could do that globally on a simpleflow worker but it happens we don't use those for now at Botify (we use private code for launching processes), and they're probably buggy anyway
  • allow tasks to define a cleanup action so the worker can be killed and cleanup can take place after that (not easy a priori, since we'd probably need a way to pass parameters between the two things..)

Generic: add retries when getting throttled by SWF API

At Botify we're getting to a point where sometimes we're being throttled by SWF API. This happened last week-end with a the rate-limit exceeded on a RespondDecisionTaskCompleted action, which is a bit unfortunate: the task did succeed, but because of the rate-limit, we retried it. Luckily this was only a 6 min task but still.

For the record the SWF rate-limit policy is described here: http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html#throttle-limits-useast1

The error looks like this:

Traceback (most recent call last):
  File "/code/botify/backend/process/worker/base.py", line 225, in process_task
    self.complete(token, json.dumps(result, default=lambda v: repr(v)))
  File "/usr/local/lib/python2.7/dist-packages/swf/actors/worker.py", line 79, in complete
    raise ResponseError(e.body['message'])
ResponseError: Rate exceeded

After a quick test, throttling errors look like this:

>>> print e
SWFResponseError: 400 Bad Request
{u'message': u'Rate exceeded', u'__type': u'com.amazon.coral.availability#ThrottlingException'}

>>> print e.error_code
ThrottlingException

So we could add some retry logic via a decorator on most tasks if the exception has an error_code and error_code == "ThrottlingException"

Add a Activity type to execute a program

This type would define the program to execute and its arguments.

We need a convention to pass arguments to the program. We could either pass them:

  • In the standard input.
  • As positional arguments and let the program access them through argv.
  • As command line options, for example the variable a is called with --a 'value_of_a'.

They all require to serialize the values in a string because cannot ensure the type of the variables, and we may have some non-scalar values such as lists or tuples.

A straight-forward protocol would be to pass the arguments as positional command line arguments after converting them to strings.

Example:

sort_program = ProcessActivity(path='/usr/local/bin/sort_program', 'infile', 'outfile')
self.submit(sort_program, '/data/infile', '/data/outfile')

The worker will then call /usr/local/bin/sort_program in a sub process (using the subprocess module) as:

subprocess.call(['/usr/local/bin/sort_program', '/data/infile', '/data/outfile'])

It may use a variant of subprocess.call() to control the interaction (stdin, stdout, andstderr`) with the sub process.

Better handling of OPEN_ACTIVITIES_LIMIT_EXCEEDED

There's a limit on SWF, you cannot have more than 1000 tasks open (== scheduled or started). simpleflow already has a protection for not scheduling too many tasks, it lives here: https://github.com/botify-labs/simpleflow/blob/master/simpleflow/swf/executor.py#L264-L267, but it seems it doesn't work very well.

As it doesn't work, activity tasks are constantly rescheduled even if SWF says "no". In the latest workflow where botify had this limit reached, things became bad around event 9100, and simpleflow continued to send ScheduleActivityTask decisions. At some point the workflow reached 25k events and it broke with:

    {
        "eventId": 25044,
        "eventTimestamp": 1446958847.47,
        "eventType": "WorkflowExecutionTerminated",
        "workflowExecutionTerminatedEventAttributes": {
            "cause": "EVENT_LIMIT_EXCEEDED",
            "childPolicy": "TERMINATE"
        }
    }

Multiple options to solve this:

  • 1- find the bug and fix it ; it may be a off-by-one error in our calculus, or maybe we have workflows where tasks are not processed in the same order..
  • 2- adapt the limit when receiving a ScheduleActivityTaskFailed event with cause = OPEN_ACTIVITIES_LIMIT_EXCEEDED
  • 3- ask for AWS support to raise this limit => won't improve anything, it's a bug in simpleflow
  • 4- break the workflow with child workflows => nice on the paper, but not easy to implement + more complicated to operate + we could still trigger this very bug
  • 5- reduce the number of tasks (highly dependent on your constraint, not simpleflow's job)
  • 6- set a soft limit on the number of open activity tasks: scheduling 1000 activity tasks when you only have 20 or 30 activity workers ready to take them is not only useless, it's also dangerous because the ScheduleToStart and ScheduleToClose clocks start running earlier, and you know that with your current platform you cannot honnor those tasks.

Option 1 and 2 are normal investigations.

The option 4 (child workflows) will be explored eventually in the next few weeks/months.

We may also explore option 6 immediately because it should be easy to implement and would avoid problems down the road. Constantly flirting with the limits is not a good idea in practice.

Option 5 may be discussed internally, not releveant to simpleflow interests.

Access tag_list to make decisions

We started to experiment with simpleflow. However, I'm a little stuck. Here is the outline what I would like do:

  1. IAM: Create an AWS User with restricted SWF API Access. The user should be only able to call StartWorkflowExecution and only if specifies a test tag. (That's easy with IAM.)
  2. Simpleflow: When a new workflow execution starts, I would like to access the tag_list of the execution to make a decision in our flow.

We created something very similar to the basic example. How could we access the tag-list in BasicWorkflow::run() method? Currently that seems impossible to me. Do we have a wrong approach - should we access the tags somewhere else?

Add a module to provide statistics about workflows

This module should provide facilities to read the history of a workflow execution and return:

  • The time spent in each activity or child workflow.
  • The activites that failed.
  • The number and types of failures.

It should also provides statistics on several workflow executions such as the top N of slowest task (slowest being a function of time such as the mean or median).

Boto error in a replay

11/14/2014 02:30:03 ERROR:Cannot replay the workflow TypeError(string indices must be integers, not str)
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 295, in replay
    result = self.run_workflow(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/executor.py", line 59, in run_workflow
    return self._workflow.run(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/cdf/workflows/analysis.py", line 555, in run
    for part_id in partitions.result
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/workflow.py", line 30, in submit
    return self._executor.submit(func, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 261, in submit
    return self.resume(task, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 232, in resume
    self.schedule_task(task)
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 201, in schedule_task
    decisions = task.schedule(self.domain)
  File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/task.py", line 14, in schedule
    version=activity.version,
  File "/usr/local/lib/python2.7/dist-packages/swf/utils.py", line 159, in __new__
    new = mutableclass(*args, **kw) # __init__ gets called while still mutable
  File "/usr/local/lib/python2.7/dist-packages/swf/models/activity.py", line 121, in __init__
    super(self.__class__, self).__init__(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/swf/core.py", line 40, in __init__
    boto.swf.connect_to_region(self.region, **settings_))
  File "/usr/local/lib/python2.7/dist-packages/boto/swf/__init__.py", line 45, in connect_to_region
    return region.connect(**kw_params)
  File "/usr/local/lib/python2.7/dist-packages/boto/regioninfo.py", line 188, in connect
    return self.connection_cls(region=self, **kw_params)
  File "/usr/local/lib/python2.7/dist-packages/boto/swf/layer1.py", line 85, in __init__
    debug, session_token, profile_name=profile_name)
  File "/usr/local/lib/python2.7/dist-packages/boto/connection.py", line 558, in __init__
    profile_name)
  File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 197, in __init__
    self.get_credentials(access_key, secret_key, security_token, profile_name)
  File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 365, in get_credentials
    self._populate_keys_from_metadata_server()
  File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 384, in _populate_keys_from_metadata_server
    self._access_key = security['AccessKeyId']
TypeError: string indices must be integers, not str

boto.exception.SWFResponseError: SWFResponseError: 400 Bad Request

When i try to execute the tutorial samples, i got this error below.
u'message': u'The security token included in the request is invalid.

What do i have to configure to make it work?
Already have the .swf file in my home with my key_id and key_password.

Thanks in advance!

simpleflow.activity.Activity timeout arguments aren't optional

I created all of my activity types, workflow types, and domains manually in the AWS console before I began using simpleflow, so I have no experience trying to register these types using simpleflow. Since my types were pre-registered, I thought I'd be able to leave off all four of the timeout parameters when decorating an activity function with @simpleflow.activity.with_attributes. However, my decider received this error

  File "/usr/local/lib/python2.7/dist-packages/swf/actors/decider.py", line 53, in complete
    raise ResponseError(e.body['message'])
ResponseError: Invalid duration: None

Adding the timeouts to the decorator parameters fixed this. If these are indeed optional, I assume the fix is for the library to remove any keys whose value is None before making the request.

Local execution: serialize and deserialize objects before passing them to task

The local execution does not serialize and deserialize the objects it passes to the task handlers. However users expect the local execution to mimic distributed execution and use it to test a workflow will work. This implicit promise is broken when a workflow definition calls tasks with objects that does not serialize to Python.

Improve tests with moto/swf

Some tests can be simplified, and other parts of the code that were not really well tested could now be tested a bit more easily.

SWF support for moto is not merged yet but I opened a PR yesterday => getmoto/moto#451

Handle FailWorkflowExecutionFailed event

A FailWorkflowExecution decision may fail with error UnhandledDecision if there is another scheduled decision at the same time.

See the SWF documentation: An UnhandledDecision fault will be returned if a workflow closing decision is specified and a signal or activity event had been added to the history while the decision task was being performed by the decider. Unlike the above situations which are logic issues, this fault is always possible because of race conditions in a distributed system. The right action here is to call RespondDecisionTaskCompleted without any decisions. This would result in another decision task with these new events included in the history. The decider should handle the new events and may decide to close the workflow execution.

Remove devel branch

Since the end of 2014 we only use the master branch here. I propose we remove the "devel" branch. Any reason why we shouldn't?

Handle the failure to schedule an activity

Overview

When SWF cannot schedule an activity, it adds an ActivityEvent with a scheduled_failed state. It occurs when:

  • The activity type does not exist.
  • The number of activity to schedule exceeds the rate limit.

Activity Type does not exist

See #6.

Rate limit exceeded

In this case, the executor should only schedule a TimerDecision with a value defined by an exponential backoff. It needs to find the number of backoff to set the next value accordingly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.