botify-labs / simpleflow Goto Github PK
View Code? Open in Web Editor NEWPython library for dataflow programming.
Home Page: https://botify-labs.github.com/simpleflow/
License: MIT License
Python library for dataflow programming.
Home Page: https://botify-labs.github.com/simpleflow/
License: MIT License
A Future
could be a functor (in scala.concurrent.future
it is even a monad) and provide the operation map :: f a -> (a -> b) -> f b
(fmap :: (a -> b) -> f a -> f b
in Haskell) where the first argument is self
(an instance of Future
). If we translate the type signature to our future case, map :: Future a -> (a -> b) -> Future b
.
Let take a simple example:
y = self.submit(comp1, x)
z = self.submit(comp2, y)
where:
comp1
returns a dict
{str: str}
comp2
takes a single argument of type str
We cannot pass y.result
to comp2
. We need to pass y.result['key']
:
y.map(lambda result: result['key']
where:
y
has the type Future dict
lambda result: result['key']
has the type dict -> str
f
is Future
, a
is dict
, and b
is str
This kind of .map
method is not usual in Python. We could borrow some interfaces from the JavaScript world such as .then()
in Q.
A monad interface would ease chaining and the handling of failures.
Disclaimer: this is by no mean an issue to denigrate previous work on simpleflow. Simpleflow handles a ton of things, the code is globally nice and easy to work with thanks to solid conventions, and a good organization. But it grew up organically, like a lot of software, and it's becoming a pain to make some basic changes, so let's improve that!
I'm currently trying to make child workflows work within simpleflow. This involves naming child workflows correctly so we don't generate name collisions too easily (2 concurrent workflows can't have the same workflowId
, else the second one won't launch).
One reliable way to do that would be to get the workflowId
and runId
of the parent workflow, hash them to a little string sufficiently unique, and then use that inside the child workflow name, among other things (workflow type + an increment as of today). Remember that we cannot use a random uuid()
because the decider needs to stay stateless, while still naming things the same way across replays.
The PollForDecisionTask endpoint exposes those parameters, but they're not passed to simpleflow in the end where I need them (e.g. in simpleflow.swf.executor.Executor.replay()
.
Then the pain begins, because there are many levels of indirection from the API call via boto, and this function:
simpleflow.swf.executor.Executor.replay(history)
simpleflow.swf.process.decider.base.DeciderWorker.decide(history)
simpleflow.swf.process.decider.base.DeciderPoller.decide(history)
simpleflow.swf.process.decider.base.DeciderPoller.process(task)
(where task is in fact a tuple of token
+ history
)simpleflow.swf.process.actor.Poller.start(..)
with task deduced from a self._poll()
simpleflow.swf.process.actor.Poller._poll()
actually delegates to Poller.poll()
, which is abstract (which is OK, workers and deciders don't poll the same endpoint)simpleflow.swf.process.decider.base.DeciderPoller.poll()
, which delegates to swf.actors.Decider.poll()
poll_for_decision_task()
and returns a tuple token
+ history
So in order to get the workflowId + runId to simpleflow executor, we'd need to modify 8 functions, and either break their signature and return values, or deal with missing data at each level. This is not sane ihmo, and ironically, it makes that kind of tiny change a lot more difficult than if we were using the raw endpoints through boto.
Bonus: not simpleflow's problem, but we have private code at botify that relies on those functions at various levels, but we don't have an exhaustive test suite, so it will be hard no to break anything.
I see a few actions that could improve the situation:
sure
while working on moto, it almost looks like rspec in ruby (not that good, but still nice)Maybe starting from scratch on subcomponents would be simpler than trying to maintain strict compatibility (??).
I will complete this list ... next year. Any opinion welcome. If nobody has strong opinions about that, I will start implementing those ideas (not necessarily in this order) and will update this issue as I go.
While adding the decider and activity worker processes, I was thinking that anything that could help to bootstrap the execution of a workflow would be a great improvement. It led to the addition of simpleflow
commands to start, track, and stop an execution. One of the most helpful command to define and test a workflow is simpleflow standalone
. In a single command, one can execute the whole workflow by combining decider and activity processes below a single process. However this command does not scale for a distributed deployment.
The purpose of the feature described here is to provide an interface to manage computing resources. The main artifact is the node which refer to a machine inside a group of machine. Speaking of a cluster may suggest all nodes are the same, or at least similar. However we want to be able to support different types of computation and to easily provides the right resources: CPU, memory, storage and network.
A node requires a snapshot of its initial state, commonly called an image. In AWS parlance we will refer to an AMI which is the filesystem snapshot used to boot an EC2 instance.
Though there are libraries that abstract the specific cloud hosting provider such as libcloud, I prefer to start to experiment with Amazon EC2. It does not mean I want to hard code everything. The EC2 backend will registered itself as part of several hosting backends. At first, it will be the only one, but feel free to add any other backend you need or like.
How to use this interface? After reading the paragraphs above you may be thinking it's little abstract. Let's take a concrete example on how to use it.
To execute a workflow, we need three processes:
All these processes are stateless. The decider and activity worker are polling tasks. It means they do not need to be running when the client starts a workflow or SWF triggers an event. We could then leverage this behavior to create a group of processes and their underlying supporting machine on-demand:
It requests the cancelling of the workflow. It then can trigger an .on_cancellation()
callback.
To find a task in the workflow's history, the executor assigns a unique id to the task. It increments the last id associated with the task's name:
class TaskRegistry(dict):
"""This registry tracks tasks and assign them an integer identifier.
"""
def add(self, task):
"""
ID's are assigned sequentially by incrementing an integer. They start
from 0.
:returns:
:rtype: int.
"""
name = task.name
self[name] = self.setdefault(name, 0) + 1
return self[name]
However it does not work if the execution flow changes like in simple implementation of a retry on a timeout:
f_result = self.submit(f, a) # task1
if f_result.finished and f_result.exception:
if is_timeout(f_result.exception):
f_result = self.submit(f, a) # task1bis
f_result_2 = self.submit(f, b) # task2
futures.wait(f_result, f_result_2)
Let's simulate the execution of this workflow:
In the first run, the executor:
1
to task1
.2
to task2
.futures.wait()
.task1
with id 1
and task2
with id 2
.In the second run:
task1
timed out.task2
finished at the same time.1
to task1
.2
to task1bis
and finds it in the history. It mistakenly put the result of task2
in task1bis
(they are not even called with the same argument).3
to task2
and does not find it in the history.futures.wait()
The issue there is that the id of a task is generated with respect to the code that is evaluated before and this code may change depending on the execution of the workflow.
Assuming that tasks are idempotent would make things easier, because the executor would only need to hash the name and input of a task to generate its id. However if tasks can have side-effects (and we cannot ensure they cannot), we need to provide another mechanism. I thought that with could mark a computation as idempotent with a cached
or memoized
attribute. But we still need to handle other computations.
When an activity fails or times out, the executor schedules it again. However it continues to schedule it, even when the task reaches the maximum number of retries.
This type would define the program to execute and its arguments.
We need a convention to pass arguments to the program. We could either pass them:
argv
.a
is called with --a 'value_of_a'
.They all require to serialize the values in a string because cannot ensure the type of the variables, and we may have some non-scalar values such as lists or tuples.
A straight-forward protocol would be to pass the arguments as positional command line arguments after converting them to strings.
Example:
sort_program = ProcessActivity(path='/usr/local/bin/sort_program', 'infile', 'outfile')
self.submit(sort_program, '/data/infile', '/data/outfile')
The worker will then call /usr/local/bin/sort_program
in a sub process (using the subprocess
module) as:
subprocess.call(['/usr/local/bin/sort_program', '/data/infile', '/data/outfile'])
It may use a variant of subprocess.call()
to control the interaction (stdin
, stdout, and
stderr`) with the sub process.
Dates in workflow history doesn't have their timezone making it really hard to compare with other dates like in this example https://github.com/botify-hq/botify-workflow-dashboard/blob/master/test/utils/workflowFile/input-0.json where tasks and metrology steps aren't in the same timezone.
simpleflow/simpleflow/history.py
Line 31 in 4317a13
"scheduled_timestamp": "2015-12-02T10:42:27.045000",
"started_timestamp": "2015-12-02T10:42:27.145000",
"completed_timestamp": "2015-12-02T10:47:01.593000",
"timed_out_timestamp":
"failed_timestamp"
"cancelled_timestamp"
BTW: timestamps are number of milliseconds since 1 jan 1970. So in the workflow history file, timestamps are actually datetimes and times are timestamps.
At Botify we're getting to a point where sometimes we're being throttled by SWF API. This happened last week-end with a the rate-limit exceeded on a RespondDecisionTaskCompleted
action, which is a bit unfortunate: the task did succeed, but because of the rate-limit, we retried it. Luckily this was only a 6 min task but still.
For the record the SWF rate-limit policy is described here: http://docs.aws.amazon.com/amazonswf/latest/developerguide/swf-dg-limits.html#throttle-limits-useast1
The error looks like this:
Traceback (most recent call last):
File "/code/botify/backend/process/worker/base.py", line 225, in process_task
self.complete(token, json.dumps(result, default=lambda v: repr(v)))
File "/usr/local/lib/python2.7/dist-packages/swf/actors/worker.py", line 79, in complete
raise ResponseError(e.body['message'])
ResponseError: Rate exceeded
After a quick test, throttling errors look like this:
>>> print e
SWFResponseError: 400 Bad Request
{u'message': u'Rate exceeded', u'__type': u'com.amazon.coral.availability#ThrottlingException'}
>>> print e.error_code
ThrottlingException
So we could add some retry logic via a decorator on most tasks if the exception has an error_code
and error_code == "ThrottlingException"
... thus simpleflow.futures.exception raises an AttributeError.
There's a limit on SWF, you cannot have more than 1000 tasks open (== scheduled or started). simpleflow already has a protection for not scheduling too many tasks, it lives here: https://github.com/botify-labs/simpleflow/blob/master/simpleflow/swf/executor.py#L264-L267, but it seems it doesn't work very well.
As it doesn't work, activity tasks are constantly rescheduled even if SWF says "no". In the latest workflow where botify had this limit reached, things became bad around event 9100, and simpleflow continued to send ScheduleActivityTask
decisions. At some point the workflow reached 25k events and it broke with:
{
"eventId": 25044,
"eventTimestamp": 1446958847.47,
"eventType": "WorkflowExecutionTerminated",
"workflowExecutionTerminatedEventAttributes": {
"cause": "EVENT_LIMIT_EXCEEDED",
"childPolicy": "TERMINATE"
}
}
Multiple options to solve this:
ScheduleActivityTaskFailed
event with cause = OPEN_ACTIVITIES_LIMIT_EXCEEDED
ScheduleToStart
and ScheduleToClose
clocks start running earlier, and you know that with your current platform you cannot honnor those tasks.Option 1 and 2 are normal investigations.
The option 4 (child workflows) will be explored eventually in the next few weeks/months.
We may also explore option 6 immediately because it should be easy to implement and would avoid problems down the road. Constantly flirting with the limits is not a good idea in practice.
Option 5 may be discussed internally, not releveant to simpleflow interests.
swf.executor
maps a timeout event to a TimeoutError
object that does not have a reason
attribute. If the task has the flag raises_on_failure
enabled, swf.Executor.resume()
will wrap the TimeoutError
in a TaskException
. It then raises in swf.Executor.replay()
that logs a message. It fails to make the message from the reason
attribute.
A workflow execution cannot have more than 1,000 open activity tasks. A activity is open when its state is either scheduled or started.
On exception, worker processes save the Pythoncstr(exception) in the task's "reason" and traceback in "details". This is basically unusable. I suggest a more general format: repr(exception) in "message" and a JSON object with error
, message
and traceback
keys in "details".
Misplaced else
?
And applied some flake8.
When SWF cannot schedule an activity, it adds an ActivityEvent
with a scheduled_failed
state. It occurs when:
See #6.
In this case, the executor should only schedule a TimerDecision
with a value defined by an exponential backoff. It needs to find the number of backoff to set the next value accordingly.
Example:
>>> x = self.submit(f, 1)
>>> y = self.submit(g, x)
What happened when x.exception
is not None
? The alternatives are:
y.exception
with a exception that wraps x.exception
and tells that an argument has an exception, orx.exception
I try to use --tags
with the following command based on the README.md.
simpleflow standalone --domain TestDomain examples.basic.BasicWorkflow --input '[1, 5]' --tags tag1,tag2
Traceback (most recent call last):
File "/Users/frncmx/.prezi/simpleflow/virtualenv/bin/simpleflow", line 11, in <module>
load_entry_point('simpleflow', 'console_scripts', 'simpleflow')()
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 716, in __call__
return self.main(*args, **kwargs)
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 696, in main
rv = self.invoke(ctx)
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 1060, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 889, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/core.py", line 534, in invoke
return callback(*args, **kwargs)
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/click/decorators.py", line 17, in new_func
return f(get_current_context(), *args, **kwargs)
File "/Users/frncmx/.prezi/simpleflow/simpleflow/command.py", line 502, in standalone
local=False,
File "/Users/frncmx/.prezi/simpleflow/simpleflow/command.py", line 161, in start_workflow
decision_tasks_timeout=decision_tasks_timeout,
File "/Users/frncmx/.prezi/simpleflow/swf/models/workflow.py", line 270, in start_execution
task_start_to_close_timeout=decision_tasks_timeout,
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 477, in start_workflow_execution
'taskStartToCloseTimeout': task_start_to_close_timeout,
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 118, in json_request
return self.make_request(action, json_input, object_hook)
File "/Users/frncmx/.prezi/simpleflow/virtualenv/lib/python2.7/site-packages/boto/swf/layer1.py", line 145, in make_request
raise excp_cls(response.status, response.reason, body=json_body)
boto.exception.SWFResponseError: SWFResponseError: 400 Bad Request
{u'Message': u'Expected list or null', u'__type': u'com.amazon.coral.service#SerializationException'}
boto
excepts a list but WorkflowType::start_execution()
does not process tag_list
provided through --tags
.
The AWS CLI manual defines the following syntax for --tag-list
option:
--tag-list "string" "string" ...
But they only consume option arguments, there is no any positional arg.
So I think we could only have something like:
--tags "string","string", ...
In the case above the following code would be enough to add in the start_execution
function:
if tag_list is not None:
tag_list = tag_list.split(',')
This has been discussed a little in #70 , but I open a separate issue for this. We'd like to be able to pass an object instance to Workflow.submit() directly to ease its instrumentation:
self.submit(Foo(bar))
The recent refactoring around activity tasks could help with that.
For now I have something working here: https://github.com/botify-labs/simpleflow/tree/feature/instance-based-activities but I'm not really satisfied (one more class concept, hacky generation of an Activity, "callable" still used everywhere where in fact we have an object, no automatic args/kwargs resolution, etc.). Will try to improve that later.
I created all of my activity types, workflow types, and domains manually in the AWS console before I began using simpleflow, so I have no experience trying to register these types using simpleflow. Since my types were pre-registered, I thought I'd be able to leave off all four of the timeout parameters when decorating an activity function with @simpleflow.activity.with_attributes. However, my decider received this error
File "/usr/local/lib/python2.7/dist-packages/swf/actors/decider.py", line 53, in complete
raise ResponseError(e.body['message'])
ResponseError: Invalid duration: None
Adding the timeouts to the decorator parameters fixed this. If these are indeed optional, I assume the fix is for the library to remove any keys whose value is None before making the request.
We started to experiment with simpleflow. However, I'm a little stuck. Here is the outline what I would like do:
StartWorkflowExecution
and only if specifies a test tag. (That's easy with IAM.)tag_list
of the execution to make a decision in our flow.We created something very similar to the basic example. How could we access the tag-list
in BasicWorkflow::run()
method? Currently that seems impossible to me. Do we have a wrong approach - should we access the tags somewhere else?
This module should provide facilities to read the history of a workflow execution and return:
It should also provides statistics on several workflow executions such as the top N of slowest task (slowest being a function of time such as the mean or median).
I thought the identity argument to swf.actors.worker.ActivityWorker's initializer and/or its poll method was optional, particularly since it has a default value of None. However, leaving it out causes an AttributeError.
File "/usr/local/lib/python2.7/dist-packages/simpleflow/history.py", line 66, in parse_activity_event
activity['identity'] = event.identity
AttributeError: 'ActivityTaskEvent' object has no attribute 'identity'
In this case, the executor has to create the activity type and let the replay behavior try to schedule the activity again.
This allows to remove the .get_or_create()
calls from ActivityTask.schedule()
and to only query SWF when needed. It also permit to run all the tests without AWS API keys.
An OOM for instance won't be caught by try ... except
. The wrapper then returns a bogus EOFError.
Testcase without OOM 😇:
# zz.py
import os
from simpleflow import execute
@execute.python()
def stupid():
pid = os.getpid()
os.kill(pid, 9)
python
Python 2.7.6 (default, Mar 22 2014, 22:59:56)
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import zz
>>> zz.stupid()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/dist-packages/simpleflow/execute.py", line 157, in execute
base64.b64decode(excline.rstrip()))
EOFError
>>>
It would be nice to have a raises_on_failure
flag for canvas nodes that can break/cancel a whole branch of a canvas tree.
This flag should be propagated recursively (BFS) to children until the next flagged child is found (on create/init).
Then, the behaviour of the nodes should be:
True
fails, it propagates a failure message to its direct parent.False
).It looks simple, but please let me know your thoughts.
SWF now allows to set the priority of a task. This requires to support additional parameters when calling Executor.submit()
.
I need to access to the workflow history at the end of the run even if the workflow execution is not finished.
I suggest to call after_run
not only when the workflow is finished and to add an additional argument finished
(boolean) to indicate if the workflow is finished.
When i try to execute the tutorial samples, i got this error below.
u'message': u'The security token included in the request is invalid.
What do i have to configure to make it work?
Already have the .swf file in my home with my key_id and key_password.
Thanks in advance!
When calling Workflow.submit
, the user should be able to set ad-hoc values for task_list
, version
, and various timeouts
. This allow to adapt the task to the environment, for example, adjusting the timeouts to the value of a parameter such as the size of a collection.
The local execution does not serialize and deserialize the objects it passes to the task handlers. However users expect the local execution to mimic distributed execution and use it to test a workflow will work. This implicit promise is broken when a workflow definition calls tasks with objects that does not serialize to Python.
A workflow definition should associated with a client that allows to start, cancel, or terminate it.
History.parse_activity_event()
assumes that a schedule_failed
event happens after schedule
event. If SWF could not schedule the activity there is no such event in the history.
$ pip install simpleflow
Collecting simpleflow
Using cached simpleflow-0.11.1.tar.gz
Requirement already satisfied (use --upgrade to upgrade): boto>=2.38.0 in ./.anyenv/pyenv/versions/3.5.1/envs/wp-scripts/lib/python3.5/site-packages (from simpleflow)
Collecting tabulate==0.7.3 (from simpleflow)
Using cached tabulate-0.7.3.tar.gz
Requirement already satisfied (use --upgrade to upgrade): setproctitle in ./.anyenv/pyenv/versions/3.5.1/envs/wp-scripts/lib/python3.5/site-packages (from simpleflow)
Collecting subprocess32 (from simpleflow)
Using cached subprocess32-3.2.7.tar.gz
Complete output from command python setup.py egg_info:
This backport is for Python 2.x only.
----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/cb/cxxqz_g94jd1x5tjwrklxgd00000gn/T/pip-build-22bmoshd/subprocess32/
Today when a workflow execution is closed (completed, failed or timed out) or when an activity task has timed out, we stop heartbeating but the worker continue its work.
In some cases that might be a good idea:
Anyway in most use cases at Botify this is counter-intuitive and as simpleflow workers are limited in number, we can quickly reach a point where all workers are busy working for tasks that won't complete anyway on the SWF side, leaving new workflows without enough workers.
Hence, we should change how simpleflow handles closed activity tasks (actually: UnknownResource errors when sending a heartbeat):
Some tests can be simplified, and other parts of the code that were not really well tested could now be tested a bit more easily.
SWF support for moto is not merged yet but I opened a PR yesterday => getmoto/moto#451
This feature allows to resume a workflow where it failed, timed out or was terminated or cancelled. That what we mean by stopped.
Hence the tasks that were completed on this execution are not executed again.
This feature introduces the following problems:
There are several ways to communicate with a decider:
_
. As the client passes input in a dict
that contains the args
and kwargs
keys, we could also add a mode
key. Mode is not explicit and requires additional parameters to reference the workflow execution to resume. I prefer a _previous_workflow_execution
parameter with the two attributes workflow_id
and rund_id
that would allow to retrieve the history of the previous workflow execution.The two main problems are:
We could:
FINISHED
(failed tasks are discarded and must be executed again).This approach requires that tasks id are consistent between the previous and the current workflow_execution because this id is used to associate a future object with the task it backs.
Once we get a reference to the previous workflow with the _previous_workflow_execution
parameter, we can retrieve its input and inject it into the current workflow history.
Fails on:
=================================== FAILURES ===================================
______________________ test_workflow_with_child_workflow _______________________
def test_workflow_with_child_workflow():
workflow = TestDefinitionChildWorkflow
executor = Executor(DOMAIN, workflow)
history = builder.History(workflow,
input={'args': (1,)})
# The executor should schedule the execution of a child workflow.
decisions, _ = executor.replay(history)
assert len(decisions) == 1
assert decisions == [{
'startChildWorkflowExecutionDecisionAttributes': {
'workflowId': 'workflow-test_workflow-1',
'taskList': {
'name': 'test_task_list'
},
'executionStartToCloseTimeout': '3600',
'input': '{"args": [1], "kwargs": {}}',
'workflowType': {
'version': 'test_version',
'name': 'test_workflow'
},
'taskStartToCloseTimeout': '300'
},
'decisionType': 'StartChildWorkflowExecution'
}]
# Let's add the child workflow to the history to simulate its completion.
(history
.add_decision_task()
.add_child_workflow(
workflow,
workflow_id='workflow-test_workflow-1',
task_list=TestWorkflow.task_list,
input='"{\\"args\\": [1], \\"kwargs\\": {}}"',
result='4'))
# Now the child workflow is finished and the executor should complete the
# workflow.
decisions, _ = executor.replay(history)
workflow_completed = swf.models.decision.WorkflowExecutionDecision()
workflow_completed.complete(result=json.dumps(4))
> assert decisions[0] == workflow_completed
E assert {'decisionTyp...st_version'}}} == {'decisionType...result': '4'}}
E Detailed information truncated, use "-vv" to show
tests/test_dataflow.py:643: AssertionError
=============== 1 failed, 31 passed, 15 warnings in 3.87 seconds ===============
The command "py.test" exited with 1.
Done. Your build exited with 1.
When we added idempotent tasks (#58) it was mainly a concern of not having tasks mixed up in workflows that have a variable geometry (where tasks are discovered in a non-deterministic order which varies from one replay to another).
But we didn't add any safeguard so that idempotent tasks would not be re-launched multiple times. So today if you try to schedule two identical idempotent tasks, two possibilities:
ACTIVITY_ID_ALREADY_IN_USE
error ; simpleflow being a bit dumb, the decider will take another decision and retry the exact same thing, and get the same error until the first time is finished.We should make it so idempotent tasks are scheduled only once. And we should not break retries while doing that, obviously :)
Somehow related to #108, low urgency: when starting a simpleflow process (Worker, Decider) we start everything at once. Here at Botify we usually have 8 subprocesses for each master process, and we have many activity workers (sometimes > 50), each having multiple master processes. Numbers add up quickly, so maybe we should stagger process start inside a given master process (say, start only one child process each second).
Low priority, just a thought for later.
Per comment at https://github.com/botify-labs/simpleflow/blob/master/simpleflow/swf/executor.py#L281
This should not happen, unless Activity or Workflow is redefined—which is a bug
See #85, we don't have python 3.x compatibility anymore, so maybe work on that in the next weeks.
A FailWorkflowExecution
decision may fail with error UnhandledDecision
if there is another scheduled decision at the same time.
See the SWF documentation: An UnhandledDecision fault will be returned if a workflow closing decision is specified and a signal or activity event had been added to the history while the decision task was being performed by the decider. Unlike the above situations which are logic issues, this fault is always possible because of race conditions in a distributed system. The right action here is to call RespondDecisionTaskCompleted without any decisions. This would result in another decision task with these new events included in the history. The decider should handle the new events and may decide to close the workflow execution.
Today simpleflow decider shares workflow definition instances. It's particularly visible if you try the simpleflow standalone
command with a workflow like that:
class MyWorkflow(Workflow):
def __init__(self):
self.items = []
def run(self):
self.items.append("foo")
print self.items
for i in [1, 2, 3, 4]:
future = self.submit(task)
future.wait()
This is a bit schematic, but the decider will have to replay the workflow 4 times and you will see that self.items
grows bigger and bigger.
Of course this is a dumb example, but:
For now we have fixed our internal workflow to avoid that, but I'll eventually fix simpleflow so it re-instantiate the workflow definition each time it starts a "replay".
Hi,
It looks like if the activity polling process can't send a heartbeat for a single task, the whole polling process would crash:
Is this intended? Let's say if there is a network partition or some transient network issue to send heartbeat, is ignoring the error a better solution? In that case, the worker will continue polling for other tasks and the problematic task will have heartbeat timeout. The current behavior would lead to losing activity workers.
Thanks,
Kefu
Per discussion with @ggreg , we notice that the semantic difference between our version of future (simpleflow.futures.Future
) and python's future (concurrent.futures.Future
) is mainly:
However, these 2 points all come from the SWF executor. As we've separated backend executor (swf, local, local_async), these special future behavior should also be separated from the main interface.
It is proposed to:
concurrent.futures.Future
as the base Future
definition.map
, see #7SwfFuture
that handles Swf specific cancelling and blocking strategy (blocking).11/14/2014 02:30:03 ERROR:Cannot replay the workflow TypeError(string indices must be integers, not str)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 295, in replay
result = self.run_workflow(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/simpleflow/executor.py", line 59, in run_workflow
return self._workflow.run(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/cdf/workflows/analysis.py", line 555, in run
for part_id in partitions.result
File "/usr/local/lib/python2.7/dist-packages/simpleflow/workflow.py", line 30, in submit
return self._executor.submit(func, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 261, in submit
return self.resume(task, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 232, in resume
self.schedule_task(task)
File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/executor.py", line 201, in schedule_task
decisions = task.schedule(self.domain)
File "/usr/local/lib/python2.7/dist-packages/simpleflow/swf/task.py", line 14, in schedule
version=activity.version,
File "/usr/local/lib/python2.7/dist-packages/swf/utils.py", line 159, in __new__
new = mutableclass(*args, **kw) # __init__ gets called while still mutable
File "/usr/local/lib/python2.7/dist-packages/swf/models/activity.py", line 121, in __init__
super(self.__class__, self).__init__(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/swf/core.py", line 40, in __init__
boto.swf.connect_to_region(self.region, **settings_))
File "/usr/local/lib/python2.7/dist-packages/boto/swf/__init__.py", line 45, in connect_to_region
return region.connect(**kw_params)
File "/usr/local/lib/python2.7/dist-packages/boto/regioninfo.py", line 188, in connect
return self.connection_cls(region=self, **kw_params)
File "/usr/local/lib/python2.7/dist-packages/boto/swf/layer1.py", line 85, in __init__
debug, session_token, profile_name=profile_name)
File "/usr/local/lib/python2.7/dist-packages/boto/connection.py", line 558, in __init__
profile_name)
File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 197, in __init__
self.get_credentials(access_key, secret_key, security_token, profile_name)
File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 365, in get_credentials
self._populate_keys_from_metadata_server()
File "/usr/local/lib/python2.7/dist-packages/boto/provider.py", line 384, in _populate_keys_from_metadata_server
self._access_key = security['AccessKeyId']
TypeError: string indices must be integers, not str
Since the end of 2014 we only use the master branch here. I propose we remove the "devel" branch. Any reason why we shouldn't?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.