Code Monkey home page Code Monkey logo

argo-python-dsl's Introduction

argo-python-dsl   Release

License   CI  

Python DSL for Argo Workflows

If you're new to Argo, we recommend checking out the examples in pure YAML. The language is descriptive and the Argo examples provide an exhaustive explanation.

For a more experienced audience, this DSL grants you the ability to programatically define Argo Workflows in Python which is then translated to the Argo YAML specification.

The DSL makes use of the Argo models defined in the Argo Python client repository. Combining the two approaches we are given the whole low-level control over Argo Workflows.

Getting started

Hello World

This example demonstrates the simplest functionality. Defining a Workflow by subclassing the Workflow class and a single template with the @template decorator.

The entrypoint to the workflow is defined as an entrypoint class property.

Argo YAMLArgo Python

# @file: hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: hello-world
  generateName: hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    container:
      name: whalesay
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["hello world"]

from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template

from argo.workflows.dsl.templates import V1Container


class HelloWorld(Workflow):

    entrypoint = "whalesay"

    @template
    def whalesay(self) -> V1Container:
        container = V1Container(
            image="docker/whalesay:latest",
            name="whalesay",
            command=["cowsay"],
            args=["hello world"]
        )

        return container

DAG: Tasks

This example demonstrates tasks defined via dependencies forming a diamond structure. Tasks are defined using the @task decorator and they must return a valid template.

The entrypoint is automatically created as main for the top-level tasks of the Workflow.

Argo YAMLArgo Python

# @file: dag-diamond.yaml
# The following workflow executes a diamond workflow
#
#   A
#  / \
# B   C
#  \ /
#   D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: dag-diamond
  generateName: dag-diamond-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

  # @task: [A, B, C, D]
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      name: echo
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]

from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

Artifacts

Artifacts can be passed similarly to parameters in three forms: arguments, inputs and outputs, where arguments is the default one (simply @artifact or @parameter).

I.e.: inputs.artifact(...)

Both artifacts and parameters are passed one by one, which means that for multiple artifacts (parameters), one should call:

@inputs.artifact(name="artifact", ...)
@inputs.parameter(name="parameter_a", ...)
@inputs.parameter(...)
def foo(self, artifact: V1alpha1Artifact, prameter_b: V1alpha1Parameter, ...): pass

A complete example:

Argo YAMLArgo Python

# @file: artifacts.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: artifact-passing
  generateName: artifact-passing-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: generate-artifact
        template: whalesay
      - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{tasks.generate-artifact.outputs.artifacts.hello-art}}"

  - name: whalesay
    container:
      name: "whalesay"
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      name: "print-message"
      image: alpine:latest
      command: [sh, -c]
      args: ["cat", "/tmp/message"]

from argo.workflows.dsl import Workflow

from argo.workflows.dsl.tasks import *
from argo.workflows.dsl.templates import *

class ArtifactPassing(Workflow):

    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()

    @task
    @artifact(
        name="message",
        _from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)

    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )

        return container

    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )

        return container


Going further: closure and scope

This is where it gets quite interesting. So far, we've only scratched the benefits that the Python implementation provides.

What if we want to use native Python code and execute it as a step in the Workflow. What are our options?

Option A) is to reuse the existing mindset, dump the code in a string, pass it as the source to the V1ScriptTemplate model and wrap it with the template decorator. This is illustrated in the following code block:

import textwrap

class ScriptsPython(Workflow):

    ...

    @template
    def gen_random_int(self) -> V1alpha1ScriptTemplate:
        source = textwrap.dedent("""\
          import random
          i = random.randint(1, 100)
          print(i)
        """)

        template = V1alpha1ScriptTemplate(
            image="python:alpine3.6",
            name="gen-random-int",
            command=["python"],
            source=source
        )

        return template

Which results in:

api_version: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generate_name: scripts-python-
  name: scripts-python
spec:
  entrypoint: main

  ...

  templates:
  - name: gen-random-int
    script:
      command:
      - python
      image: python:alpine3.6
      name: gen-random-int
      source: 'import random\ni = random.randint(1, 100)\nprint(i)\n'

Not bad, but also not living up to the full potential. Since we're already writing Python, why would we wrap the code in a string? This is where we introduce closures.

closures

The logic of closures is quite simple. Just wrap the function you want to execute in a container in the @closure decorator. The closure then takes care of the rest and returns a template (just as the @template decorator).

The only thing we need to take care of is to provide it an image which has the necessary Python dependencies installed and is present in the cluster.

There is a plan to eliminate even this step in the future, but currently it is inavoidable.

Following the previous example:

class ScriptsPython(Workflow):

    ...

    @closure(
      image="python:alpine3.6"
    )
    def gen_random_int() -> V1alpha1ScriptTemplate:
          import random

          i = random.randint(1, 100)
          print(i)

The closure implements the V1alpha1ScriptTemplate, which means that you can pass in things like resources, env, etc...

Also, make sure that you import whatever library you are using, the context is not preserved --- closure behaves as a staticmethod and is sandboxed from the module scope.

scopes

Now, what if we had a function (or a whole script) which is quite big. Wrapping it in a single Python function is not very Pythonic and it gets tedious. This is where we can make use of scopes.

Say that we, for example, wanted to initialize logging before running our gen_random_int function.

    ...

    @closure(
      scope="main",
      image="python:alpine3.6"
    )
    def gen_random_int(main) -> V1alpha1ScriptTemplate:
          import random

          main.init_logging()

          i = random.randint(1, 100)
          print(i)

    @scope(name="main")
    def init_logging(level="DEBUG"):
        import logging

        logging_level = getattr(logging, level, "INFO")
        logging.getLogger("__main__").setLevel(logging_level)

Notice the 3 changes that we've made:

    @closure(
      scope="main",  # <--- provide the closure a scope
      image="python:alpine3.6"
    )
    def gen_random_int(main):  # <--- use the scope name
    @scope(name="main")  # <--- add function to a scope
    def init_logging(level="DEBUG"):

Each function in the given scope is then namespaced by the scope name and injected to the closure.

I.e. the resulting YAML looks like this:

...
spec:
  ...
  templates:
    - name: gen-random-int
      script:
        command:
        - python
        image: python:alpine3.6
        name: gen-random-int
        source: |-
          import logging
          import random

          class main:
            """Scoped objects injected from scope 'main'."""

            @staticmethod
            def init_logging(level="DEBUG"):
              logging_level = getattr(logging, level, "INFO")
              logging.getLogger("__main__").setLevel(logging_level)


          main.init_logging()

          i = random.randint(1, 100)
          print(i)

The compilation also takes all imports to the front and remove duplicates for convenience and more natural look so that you don't feel like poking your eyes when you look at the resulting YAML.


For more examples see the examples folder.



Authors:

argo-python-dsl's People

Contributors

asavpatel92 avatar binarycrayon avatar cermakm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

argo-python-dsl's Issues

submit failed, prompt "argo.workflows.client.rest.ApiException: (400)"

Describe the bug
i tried submit argo workflow use python which is "hello-word" dsl, but failed Could help me have a look?
python code:

from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template
import yaml

from argo.workflows.dsl.templates import V1Container

class HelloWorld(Workflow):

entrypoint = "whalesay"

@template
def whalesay(self) -> V1Container:
    container = V1Container(
        image="docker/whalesay:latest",
        name="whalesay",
        command=["cowsay"],
        args=["hello world"]
    )

    return container

wf=HelloWorld()
print(wf)

from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config

load_kube_config() # loads local configuration from ~/.kube/config
v1alpha1 = V1alpha1Api()
wfs = v1alpha1.list_namespaced_workflows(namespace="default")
print(wfs)
#v1alpha1.create_namespaced_workflow("default", wf)
wf.submit(client=V1alpha1Api(), namespace="default")

i can get wfs by client and print, but submit failed throw 400 exception
Screenshots
image
image

AttributeError: 'dependencies' object has no attribute 'model'

Describe the bug


Traceback (most recent call last):
--
  | File "/opt/app-root/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 418, in start
  | resp = await task
  | File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 82, in wrapper
  | return await wrapped_function(request, github_app=github_app)
  | File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 120, in route_github_webhook_event
  | await dispatch_event(event)
  | File "/opt/app-root/lib/python3.7/site-packages/gidgethub/routing.py", line 80, in dispatch
  | await callback(event, *args, **kwargs)
  | File "app.py", line 138, in on_pr_open_or_sync
  | installation=installation,
  | File "app.py", line 168, in _submit_thamos_workflow
  | wf = ThamosAdviseCheckRun()
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 233, in __init__
  | self.compile()
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 376, in compile
  | self.spec: V1alpha1WorkflowSpec = _compile(self.spec)
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
  | return list(map(_compile, obj))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
  | return list(map(_compile, obj))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 343, in _compile
  | if obj.model is not None:
  | AttributeError: 'dependencies' object has no attribute 'model'


how to use V1Volume, V1Toleration etc?

Description
I am trying to convert one of our workflow yamls into DSL, which uses few things like volumeClaimTemplates, volumes, volumeMounts, tolerations etc. I can't find any examples for that which explains how can I pass all those configs into V1Container?, any example with DSL would be really helpful.

Host as part of Argo Labs

Hi @CermakM,

People are very interested in your SDK. Would you be interested in moving into Argo Labs? This will give it more "official" status, encourage other to contribute.

Alex

Nested DAGs

Description

I have a scenario where I would like to encode a nested DAG using the DSL. More specifically, I have a task that uses a with_items decorator that, in turn, uses a template that is supposed to launch multiple processes that perform what I need them to perform. Is it syntactically possible to write a task that returns a template, which ultimately returns a closure for the parameter combination? In pseucode:

@task
@with_items(['a', 'b', 'c'])
def A(...params) -> template:
   return B(...params)

@template # (??? is this the right choice?)
@with_items(['d', 'e', 'f'])
@inputs.param()  # (??? a, b, and c from step A)
def B(...params) -> script template
   return C(...all params)

@closure
@inputs.param(first)
@inputs.param(second)
def C(...params) -> script template:
  print(first / second)  # results in: ad, ae, af, bd, be, bf, cd, ce, cf

Final DAG:

           *
      /    |    \
     a     b      c
    /|\   /|\    /|\
    def   def    def
     \     |    /
ad, ae, af, bd, be, bf, cd, ce, cf (final steps)
          *          

Is this possible with the current version of the DSL? If so, what could be a way to structure the workflow?

Additional context

Argo: 3.0.1
Python: 3.7
K8S: 1.18 (GKE)

Dynamic Template & Task generation

Description
Thank you for awesome package.
I'm trying to generate task and template dynamically means, i need to loop through a array and generate template and task respectively.
Is it possible to generate now?

Additional context

Would you consider integration with the component format of Kubeflow Pipelines?

Kubeflow Pipelines is built on top of Argo, but brings additional features like Python-based SDK for component and pipeline authoring.

One important feature of the KFP SDK is the concept of reusable components.
The component author can create and share their component once and then many pipeline authors can load and use those components. The format for components is declarative and language-independent. They look a bit similar to Argo's templates, but are smaller, easier to write and have several features that make writing components easier. The component library also provides ways to create components from user-provided python functions (somewhat similar idea to closure) or Airflow operators. There is even a way to create sub-DAG components based on python functions describing the workflow.

KFP components are a popular feature and I've seen several hundreds of them in the public GitHub alone. Even big companies are creating their custom components based on the format.

Some examples that demonstrate the usage of the component library: Creating components from command-line programs Data passing in python components

Although currently the component library is part of the KFP SDK, it's essentially a standalone python module that does not have any dependency on the rest of KFP SDK. If you're interested, we can think of extracting it as a separate library that can be imported without any other code. The component library has well-defined extension points that allow integrating and bridging it with any other orchestration DSL library (KFP, Tekton, TFX, Airflow, Argo-DSL).

What do you think about this integration proposal?

Missing kind and other metadata

Describe the bug
Using the DagDiamond example, on submit the server rejects based on missing kind for the template.

Requirement already satisfied: argo-workflows-dsl in ./venv/lib/python3.7/site-packages (0.1.0rc0)

To Reproduce
Steps to reproduce the behavior:

  1. standard Dagdiamond
  2. Submit it

Screenshots

class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container
(Pdb) dd
{'metadata': {'generate_name': 'dag-diamond-', 'name': 'dag-diamond'},
 'spec': {'entrypoint': 'main',
          'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'A'}]},
                                            'name': 'A',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'B'}]},
                                            'dependencies': ['A'],
                                            'name': 'B',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'C'}]},
                                            'dependencies': ['A'],
                                            'name': 'C',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'D'}]},
                                            'dependencies': ['B', 'C'],
                                            'name': 'D',
                                            'template': 'echo'}]},
                         'name': 'main'},
                        {'container': {'command': ['echo',
                                                   '{{inputs.parameters.message}}'],
                                       'image': 'alpine:3.7',
                                       'name': 'echo'},
                         'inputs': {'parameters': [{'name': 'message'}]},
                         'name': 'echo'}]},
 'status': {}}
(Pdb) dd.submit(client=service.client, namespace="argo")
*** argo.workflows.client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Sat, 21 Mar 2020 08:25:47 GMT', 'Content-Length': '1334'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Workflow in version \"v1alpha1\" cannot be handled as a Workflow: unmarshalerDecoder: Object 'Kind' is missing in '{\"metadata\": {\"generateName\": \"dag-diamond-\", \"name\": \"dag-diamond\"}, \"spec\": {\"entrypoint\": \"main\", \"templates\": [{\"dag\": {\"tasks\": [{\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"A\"}]}, \"name\": \"A\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"B\"}]}, \"dependencies\": [\"A\"], \"name\": \"B\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"C\"}]}, \"dependencies\": [\"A\"], \"name\": \"C\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"D\"}]}, \"dependencies\": [\"B\", \"C\"], \"name\": \"D\", \"template\": \"echo\"}]}, \"name\": \"main\"}, {\"container\": {\"command\": [\"echo\", \"{{inputs.parameters.message}}\"], \"image\": \"alpine:3.7\", \"name\": \"echo\"}, \"inputs\": {\"parameters\": [{\"name\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}', error found in #10 byte of ...|atus\": {}}|..., bigger context ...|e\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}|...","reason":"BadRequest","code":400}

Additional context
Add any other context about the problem here.

Workflow metadata name parsed from class name prevents generated name from working

Description
Is it a design decision that any workflow class would have it's name attr set as "class-name" by default? While the metadata name is set, generated name will not take action.

e.g. class Helloworld(Workflow) would have 'hello-world' set as metadata name by default and used as pod name explicitly in k8s. I like the idea of leaving the name empty (or optional?) and default to generated-names, so I can submit the same workflow multiple times (since we cannot have duplicate pod names in k8s, at least while using the same node )

this is no a show stopper for me, I can always call wf.name = '', but current behaviour seems counter-intuitive.

Additional context
https://github.com/CermakM/argo-python-dsl/blob/master/argo/workflows/dsl/_workflow.py#L67

AttributeError: 'NoneType' object has no attribute 'parameters' from Workflow.submit()

Describe the bug
argo.workflows.dsl._workflow.Workflow.submit checks if the Workflow has an attribute arguments but not if that attribute is not None. As a result, attempting to invoke submit() can result in an AttributeError at

self.spec.arguments.parameters = new_parameters

To Reproduce
Using the ArtifactPassing example,

from argo.workflows.client import V1alpha1Api
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import artifact, task, V1alpha1Template
from argo.workflows.dsl.templates import template, inputs, outputs, V1alpha1Artifact, V1Container


class ArtifactPassing(Workflow):

    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()

    @task
    @artifact(
        name="message",
        _from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)

    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )

        return container

    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )

        return container


if __name__ == '__main__':
    argo_api = V1alpha1Api()
    workflow_instance = ArtifactPassing()
    workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})

Expected behaviour
The Workflow will be submitted through the API client instead of an AttributeError being raised.

Screenshots
N/A

Additional context

Traceback (most recent call last):
  File ".../artifact-passing.py", line 54, in <module>
    workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})
  File ".../venv/lib/python3.8/site-packages/argo/workflows/dsl/_workflow.py", line 418, in submit
    self.spec.arguments.parameters = new_parameters
AttributeError: 'NoneType' object has no attribute 'parameters'
$ python
Python 3.8.1 (default, Feb 11 2020, 16:39:15) 
[GCC 7.4.0] on linux

It seems to be possible to manually workaround this as so:

        workflow_instance.spec._arguments = argo.workflows.client.V1alpha1Arguments()
        workflow_instance.spec._arguments.parameters = []

Development plan and contribution

Description
I have built our ML infrastructure with airflow and kubernetes. As a centralized scheduler, airflow has a few scenarios where airflow does not work well. For the next iteration of our ML workflow and scheduler, I'd like to invest in cloud-native workflow engine (basically argo). I'm actively exploring with this library and am wondering about the longer term development plan?

Do you take external contribution at the moment? (I suspect not at the moment, but would like to know when you would call it a V1 and go from there?)

Additional context

new install from master - as require the full body posted on submit

Describe the bug

On use trying to use argo 2.6.3 which requires the kind and version and various other metadata which should always be sent (because crons and templates are important) am getting

% python manage.py ps_k8s_post_workflow                                                                                                                                                                                             rosscdh@s
Traceback (most recent call last):
  File "manage.py", line 25, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 328, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 369, in execute
    output = self.handle(*args, **options)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 173, in handle
    wf = service.process()
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in process
    self.workflows = [workflow_id for workflow_id in self.send()]
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in <listcomp>
    self.workflows = [workflow_id for workflow_id in self.send()]
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 156, in send
    yield service.submit(client=service.client, namespace="argo")
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/dsl/_workflow.py", line 430, in submit
    body = client.api_client.sanitize_for_serialization(self)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in sanitize_for_serialization
    for key, val in six.iteritems(obj_dict)}
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in <dictcomp>
    for key, val in six.iteritems(obj_dict)}
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 238, in sanitize_for_serialization
    for attr, _ in six.iteritems(obj.openapi_types)
AttributeError: 'V1ObjectMeta' object has no attribute 'openapi_types'

seems an invalid type reference?

Versions

-e git+https://github.com/CermakM/argo-python-dsl@6ee661685ff87abd594714a3e8a0493eac4960b6#egg=argo_workflows_dsl

and

argo-workflows==3.2.0

To Reproduce
in order to get the updated post whole kubernetes resource to argo 2.6.3

pip install -e 'git+https://github.com/CermakM/argo-python-dsl#egg=argo-workflows-dsl'

Expected behaviour
The library works

Screenshots

Additional context
Add any other context about the problem here.

[Discussion] Dynamic task generation

Description
I have been working on a proposal to generate tasks dynamically for argo dsl.

I got to a point where I think I should get your feedback about whether it is a feasible design.

Currently, this is a work in progress, and I am still picking up the metaprogramming mechanics implemented in argo dsl. I wouldn't be surprised if my design isn't actually feasible and needed to be abandoned, but I did my best in terms of standing from a data science perspective that have authored airflow pipelines.

The proposal is here: https://gist.github.com/binarycrayon/75af90c1cdf660333f9903cd5822245d

Please let me know what you think!

Additional context
This has originally sent as email, and forwarded here per Marek's suggestion. I realize that argo also have a built in with_items or with_params https://github.com/argoproj-labs/argo-client-python/search?q=with_items&unscoped_q=with_items

Technically I'm no longer blocked for what I need to do, but I still want to get feedback

Functional API?

Hey there!

Love the project. I use Argo every day at work and a nice Python API would make many things a lot easier. I'm wondering if you have plans for a more functional type of API. For example,
something like Prefect

If you're open, I would be willing write up a PoC and contribute it.

cannot import name 'models' from 'argo.workflows'

Describe the bug
I am on master branch and tried following simple HelloWorld example.
when I try to from argo.workflows.dsl import Workflow
I get

In [2]: from argo.workflows.dsl import Workflow
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-2-11eebaacd24f> in <module>
----> 1 from argo.workflows.dsl import Workflow

~/Workspace/argo-python-dsl/argo/workflows/dsl/__init__.py in <module>
     15
     16 # modules
---> 17 from . import tasks
     18 from . import templates
     19

~/Workspace/argo-python-dsl/argo/workflows/dsl/tasks.py in <module>
     23 )
     24
---> 25 from ._arguments import artifact
     26 from ._arguments import parameter
     27 from ._base import Prop

~/Workspace/argo-python-dsl/argo/workflows/dsl/_arguments.py in <module>
      9 )
     10
---> 11 from ._base import Prop
     12
     13 __all__ = ["artifact", "parameter", "V1alpha1Artifact", "V1alpha1Parameter"]

~/Workspace/argo-python-dsl/argo/workflows/dsl/_base.py in <module>
     14 from typing import Union
     15
---> 16 from argo.workflows import models
     17
     18 T = TypeVar("T")

ImportError: cannot import name 'models' from 'argo.workflows' (./argo/workflows/__init__.py)

Python Version : Python 3.7.5

Installed argo-workflows using pip install -e "git+git://github.com/CermakM/argo-client-python@argo/v2.5.0#egg=argo-workflows"

To Reproduce
Steps to reproduce the behavior:

  1. Go to 'https://github.com/CermakM/argo-python-dsl#getting-started and follow the instruction on master branch code.

Screenshots
image

pip list | grep -i argo
argo-models              2.2.1a0
argo-workflows           3.0.0rc0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.