cermakm / argo-python-dsl Goto Github PK
View Code? Open in Web Editor NEWPython DSL for Argo Workflows | Mirrored to https://github.com/argoproj-labs/argo-python-dsl
Home Page: https://github.com/argoproj/argo
License: Apache License 2.0
Python DSL for Argo Workflows | Mirrored to https://github.com/argoproj-labs/argo-python-dsl
Home Page: https://github.com/argoproj/argo
License: Apache License 2.0
Describe the bug
argo.workflows.dsl._workflow.Workflow.submit
checks if the Workflow
has an attribute arguments
but not if that attribute is not None
. As a result, attempting to invoke submit()
can result in an AttributeError
at
self.spec.arguments.parameters = new_parameters
To Reproduce
Using the ArtifactPassing
example,
from argo.workflows.client import V1alpha1Api
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import artifact, task, V1alpha1Template
from argo.workflows.dsl.templates import template, inputs, outputs, V1alpha1Artifact, V1Container
class ArtifactPassing(Workflow):
@task
def generate_artifact(self) -> V1alpha1Template:
return self.whalesay()
@task
@artifact(
name="message",
_from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
)
def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
return self.print_message(message=message)
@template
@outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
def whalesay(self) -> V1Container:
container = V1Container(
name="whalesay",
image="docker/whalesay:latest",
command=["sh", "-c"],
args=["cowsay hello world | tee /tmp/hello_world.txt"]
)
return container
@template
@inputs.artifact(name="message", path="/tmp/message")
def print_message(self, message: V1alpha1Artifact) -> V1Container:
container = V1Container(
name="print-message",
image="alpine:latest",
command=["sh", "-c"],
args=["cat", "/tmp/message"],
)
return container
if __name__ == '__main__':
argo_api = V1alpha1Api()
workflow_instance = ArtifactPassing()
workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})
Expected behaviour
The Workflow will be submitted through the API client instead of an AttributeError being raised.
Screenshots
N/A
Additional context
Traceback (most recent call last):
File ".../artifact-passing.py", line 54, in <module>
workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})
File ".../venv/lib/python3.8/site-packages/argo/workflows/dsl/_workflow.py", line 418, in submit
self.spec.arguments.parameters = new_parameters
AttributeError: 'NoneType' object has no attribute 'parameters'
$ python
Python 3.8.1 (default, Feb 11 2020, 16:39:15)
[GCC 7.4.0] on linux
It seems to be possible to manually workaround this as so:
workflow_instance.spec._arguments = argo.workflows.client.V1alpha1Arguments()
workflow_instance.spec._arguments.parameters = []
Description
I have built our ML infrastructure with airflow and kubernetes. As a centralized scheduler, airflow has a few scenarios where airflow does not work well. For the next iteration of our ML workflow and scheduler, I'd like to invest in cloud-native workflow engine (basically argo). I'm actively exploring with this library and am wondering about the longer term development plan?
Do you take external contribution at the moment? (I suspect not at the moment, but would like to know when you would call it a V1 and go from there?)
Additional context
Hi @CermakM,
People are very interested in your SDK. Would you be interested in moving into Argo Labs? This will give it more "official" status, encourage other to contribute.
Alex
Workflow.from_url seems taking a static helloworld.yaml
https://github.com/CermakM/argo-python-dsl/blob/master/argo/workflows/dsl/_workflow.py#L279
Describe the bug
Traceback (most recent call last):
--
| File "/opt/app-root/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 418, in start
| resp = await task
| File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 82, in wrapper
| return await wrapped_function(request, github_app=github_app)
| File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 120, in route_github_webhook_event
| await dispatch_event(event)
| File "/opt/app-root/lib/python3.7/site-packages/gidgethub/routing.py", line 80, in dispatch
| await callback(event, *args, **kwargs)
| File "app.py", line 138, in on_pr_open_or_sync
| installation=installation,
| File "app.py", line 168, in _submit_thamos_workflow
| wf = ThamosAdviseCheckRun()
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 233, in __init__
| self.compile()
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 376, in compile
| self.spec: V1alpha1WorkflowSpec = _compile(self.spec)
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
| value: Any = _compile(getattr(obj, attr))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
| return list(map(_compile, obj))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
| value: Any = _compile(getattr(obj, attr))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
| value: Any = _compile(getattr(obj, attr))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
| return list(map(_compile, obj))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
| value: Any = _compile(getattr(obj, attr))
| File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 343, in _compile
| if obj.model is not None:
| AttributeError: 'dependencies' object has no attribute 'model'
Kubeflow Pipelines is built on top of Argo, but brings additional features like Python-based SDK for component and pipeline authoring.
One important feature of the KFP SDK is the concept of reusable components.
The component author can create and share their component once and then many pipeline authors can load and use those components. The format for components is declarative and language-independent. They look a bit similar to Argo's templates, but are smaller, easier to write and have several features that make writing components easier. The component library also provides ways to create components from user-provided python functions (somewhat similar idea to closure
) or Airflow operators. There is even a way to create sub-DAG components based on python functions describing the workflow.
KFP components are a popular feature and I've seen several hundreds of them in the public GitHub alone. Even big companies are creating their custom components based on the format.
Some examples that demonstrate the usage of the component library: Creating components from command-line programs Data passing in python components
Although currently the component library is part of the KFP SDK, it's essentially a standalone python module that does not have any dependency on the rest of KFP SDK. If you're interested, we can think of extracting it as a separate library that can be imported without any other code. The component library has well-defined extension points that allow integrating and bridging it with any other orchestration DSL library (KFP, Tekton, TFX, Airflow, Argo-DSL).
What do you think about this integration proposal?
Describe the bug
Using the DagDiamond example, on submit the server rejects based on missing kind for the template.
Requirement already satisfied: argo-workflows-dsl in ./venv/lib/python3.7/site-packages (0.1.0rc0)
To Reproduce
Steps to reproduce the behavior:
Screenshots
class DagDiamond(Workflow):
@task
@parameter(name="message", value="A")
def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="B")
@dependencies(["A"])
def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="C")
@dependencies(["A"])
def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@task
@parameter(name="message", value="D")
@dependencies(["B", "C"])
def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
return self.echo(message=message)
@template
@inputs.parameter(name="message")
def echo(self, message: V1alpha1Parameter) -> V1Container:
container = V1Container(
image="alpine:3.7",
name="echo",
command=["echo", "{{inputs.parameters.message}}"],
)
return container
(Pdb) dd
{'metadata': {'generate_name': 'dag-diamond-', 'name': 'dag-diamond'},
'spec': {'entrypoint': 'main',
'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
'value': 'A'}]},
'name': 'A',
'template': 'echo'},
{'arguments': {'parameters': [{'name': 'message',
'value': 'B'}]},
'dependencies': ['A'],
'name': 'B',
'template': 'echo'},
{'arguments': {'parameters': [{'name': 'message',
'value': 'C'}]},
'dependencies': ['A'],
'name': 'C',
'template': 'echo'},
{'arguments': {'parameters': [{'name': 'message',
'value': 'D'}]},
'dependencies': ['B', 'C'],
'name': 'D',
'template': 'echo'}]},
'name': 'main'},
{'container': {'command': ['echo',
'{{inputs.parameters.message}}'],
'image': 'alpine:3.7',
'name': 'echo'},
'inputs': {'parameters': [{'name': 'message'}]},
'name': 'echo'}]},
'status': {}}
(Pdb) dd.submit(client=service.client, namespace="argo")
*** argo.workflows.client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Sat, 21 Mar 2020 08:25:47 GMT', 'Content-Length': '1334'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Workflow in version \"v1alpha1\" cannot be handled as a Workflow: unmarshalerDecoder: Object 'Kind' is missing in '{\"metadata\": {\"generateName\": \"dag-diamond-\", \"name\": \"dag-diamond\"}, \"spec\": {\"entrypoint\": \"main\", \"templates\": [{\"dag\": {\"tasks\": [{\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"A\"}]}, \"name\": \"A\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"B\"}]}, \"dependencies\": [\"A\"], \"name\": \"B\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"C\"}]}, \"dependencies\": [\"A\"], \"name\": \"C\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"D\"}]}, \"dependencies\": [\"B\", \"C\"], \"name\": \"D\", \"template\": \"echo\"}]}, \"name\": \"main\"}, {\"container\": {\"command\": [\"echo\", \"{{inputs.parameters.message}}\"], \"image\": \"alpine:3.7\", \"name\": \"echo\"}, \"inputs\": {\"parameters\": [{\"name\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}', error found in #10 byte of ...|atus\": {}}|..., bigger context ...|e\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}|...","reason":"BadRequest","code":400}
Additional context
Add any other context about the problem here.
Description
I'm struggling to find a way to set the metadata for a closure as in this example: https://github.com/argoproj/argo/blob/master/examples/pod-metadata.yaml. It appears that metadata isn't a valid field for any of the union types that closure supports. Is there something that I'm missing?
Additional context
CC @alexec
Description
I am trying to convert one of our workflow yamls into DSL, which uses few things like volumeClaimTemplates
, volumes
, volumeMounts
, tolerations
etc. I can't find any examples for that which explains how can I pass all those configs into V1Container
?, any example with DSL would be really helpful.
Description
Thank you for awesome package.
I'm trying to generate task and template dynamically means, i need to loop through a array and generate template and task respectively.
Is it possible to generate now?
Additional context
Description
I would like to include the with_param to a task. Not sure how
Additional context
Describe the bug
i tried submit argo workflow use python which is "hello-word" dsl, but failed Could help me have a look?
python code:
from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template
import yaml
from argo.workflows.dsl.templates import V1Container
class HelloWorld(Workflow):
entrypoint = "whalesay"
@template
def whalesay(self) -> V1Container:
container = V1Container(
image="docker/whalesay:latest",
name="whalesay",
command=["cowsay"],
args=["hello world"]
)
return container
wf=HelloWorld()
print(wf)
from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config
load_kube_config() # loads local configuration from ~/.kube/config
v1alpha1 = V1alpha1Api()
wfs = v1alpha1.list_namespaced_workflows(namespace="default")
print(wfs)
#v1alpha1.create_namespaced_workflow("default", wf)
wf.submit(client=V1alpha1Api(), namespace="default")
i can get wfs by client and print, but submit failed throw 400 exception
Screenshots
Description
I have been working on a proposal to generate tasks dynamically for argo dsl.
I got to a point where I think I should get your feedback about whether it is a feasible design.
Currently, this is a work in progress, and I am still picking up the metaprogramming mechanics implemented in argo dsl. I wouldn't be surprised if my design isn't actually feasible and needed to be abandoned, but I did my best in terms of standing from a data science perspective that have authored airflow pipelines.
The proposal is here: https://gist.github.com/binarycrayon/75af90c1cdf660333f9903cd5822245d
Please let me know what you think!
Additional context
This has originally sent as email, and forwarded here per Marek's suggestion. I realize that argo also have a built in with_items
or with_params
https://github.com/argoproj-labs/argo-client-python/search?q=with_items&unscoped_q=with_items
Technically I'm no longer blocked for what I need to do, but I still want to get feedback
Description
Would appreciate having somewhere (maybe README
) comparing this project to the Kubeflow Pipelines Python SDK to say why you would use one or the other.
Description
Is it a design decision that any workflow class would have it's name attr set as "class-name" by default? While the metadata name is set, generated name will not take action.
e.g. class Helloworld(Workflow) would have 'hello-world' set as metadata name by default and used as pod name explicitly in k8s. I like the idea of leaving the name empty (or optional?) and default to generated-names, so I can submit the same workflow multiple times (since we cannot have duplicate pod names in k8s, at least while using the same node )
this is no a show stopper for me, I can always call wf.name = ''
, but current behaviour seems counter-intuitive.
Additional context
https://github.com/CermakM/argo-python-dsl/blob/master/argo/workflows/dsl/_workflow.py#L67
Describe the bug
On use trying to use argo 2.6.3 which requires the kind and version and various other metadata which should always be sent (because crons and templates are important) am getting
% python manage.py ps_k8s_post_workflow rosscdh@s
Traceback (most recent call last):
File "manage.py", line 25, in <module>
execute_from_command_line(sys.argv)
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
utility.execute()
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 328, in run_from_argv
self.execute(*args, **cmd_options)
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 369, in execute
output = self.handle(*args, **options)
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 173, in handle
wf = service.process()
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in process
self.workflows = [workflow_id for workflow_id in self.send()]
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in <listcomp>
self.workflows = [workflow_id for workflow_id in self.send()]
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 156, in send
yield service.submit(client=service.client, namespace="argo")
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/dsl/_workflow.py", line 430, in submit
body = client.api_client.sanitize_for_serialization(self)
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in sanitize_for_serialization
for key, val in six.iteritems(obj_dict)}
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in <dictcomp>
for key, val in six.iteritems(obj_dict)}
File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 238, in sanitize_for_serialization
for attr, _ in six.iteritems(obj.openapi_types)
AttributeError: 'V1ObjectMeta' object has no attribute 'openapi_types'
seems an invalid type reference?
Versions
-e git+https://github.com/CermakM/argo-python-dsl@6ee661685ff87abd594714a3e8a0493eac4960b6#egg=argo_workflows_dsl
and
argo-workflows==3.2.0
To Reproduce
in order to get the updated post whole kubernetes resource to argo 2.6.3
pip install -e 'git+https://github.com/CermakM/argo-python-dsl#egg=argo-workflows-dsl'
Expected behaviour
The library works
Screenshots
Additional context
Add any other context about the problem here.
Describe the bug
I am on master
branch and tried following simple HelloWorld
example.
when I try to from argo.workflows.dsl import Workflow
I get
In [2]: from argo.workflows.dsl import Workflow
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
<ipython-input-2-11eebaacd24f> in <module>
----> 1 from argo.workflows.dsl import Workflow
~/Workspace/argo-python-dsl/argo/workflows/dsl/__init__.py in <module>
15
16 # modules
---> 17 from . import tasks
18 from . import templates
19
~/Workspace/argo-python-dsl/argo/workflows/dsl/tasks.py in <module>
23 )
24
---> 25 from ._arguments import artifact
26 from ._arguments import parameter
27 from ._base import Prop
~/Workspace/argo-python-dsl/argo/workflows/dsl/_arguments.py in <module>
9 )
10
---> 11 from ._base import Prop
12
13 __all__ = ["artifact", "parameter", "V1alpha1Artifact", "V1alpha1Parameter"]
~/Workspace/argo-python-dsl/argo/workflows/dsl/_base.py in <module>
14 from typing import Union
15
---> 16 from argo.workflows import models
17
18 T = TypeVar("T")
ImportError: cannot import name 'models' from 'argo.workflows' (./argo/workflows/__init__.py)
Python Version : Python 3.7.5
Installed argo-workflows
using pip install -e "git+git://github.com/CermakM/argo-client-python@argo/v2.5.0#egg=argo-workflows"
To Reproduce
Steps to reproduce the behavior:
master
branch code.pip list | grep -i argo
argo-models 2.2.1a0
argo-workflows 3.0.0rc0
Hey there!
Love the project. I use Argo every day at work and a nice Python API would make many things a lot easier. I'm wondering if you have plans for a more functional type of API. For example,
something like Prefect
If you're open, I would be willing write up a PoC and contribute it.
Description
I have a scenario where I would like to encode a nested DAG using the DSL. More specifically, I have a task that uses a with_items
decorator that, in turn, uses a template that is supposed to launch multiple processes that perform what I need them to perform. Is it syntactically possible to write a task that returns a template, which ultimately returns a closure for the parameter combination? In pseucode:
@task
@with_items(['a', 'b', 'c'])
def A(...params) -> template:
return B(...params)
@template # (??? is this the right choice?)
@with_items(['d', 'e', 'f'])
@inputs.param() # (??? a, b, and c from step A)
def B(...params) -> script template
return C(...all params)
@closure
@inputs.param(first)
@inputs.param(second)
def C(...params) -> script template:
print(first / second) # results in: ad, ae, af, bd, be, bf, cd, ce, cf
Final DAG:
*
/ | \
a b c
/|\ /|\ /|\
def def def
\ | /
ad, ae, af, bd, be, bf, cd, ce, cf (final steps)
*
Is this possible with the current version of the DSL? If so, what could be a way to structure the workflow?
Additional context
Argo: 3.0.1
Python: 3.7
K8S: 1.18 (GKE)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.