Code Monkey home page Code Monkey logo

argo-python-dsl's Issues

AttributeError: 'NoneType' object has no attribute 'parameters' from Workflow.submit()

Describe the bug
argo.workflows.dsl._workflow.Workflow.submit checks if the Workflow has an attribute arguments but not if that attribute is not None. As a result, attempting to invoke submit() can result in an AttributeError at

self.spec.arguments.parameters = new_parameters

To Reproduce
Using the ArtifactPassing example,

from argo.workflows.client import V1alpha1Api
from argo.workflows.dsl import Workflow
from argo.workflows.dsl.tasks import artifact, task, V1alpha1Template
from argo.workflows.dsl.templates import template, inputs, outputs, V1alpha1Artifact, V1Container


class ArtifactPassing(Workflow):

    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()

    @task
    @artifact(
        name="message",
        _from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)

    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )

        return container

    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )

        return container


if __name__ == '__main__':
    argo_api = V1alpha1Api()
    workflow_instance = ArtifactPassing()
    workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})

Expected behaviour
The Workflow will be submitted through the API client instead of an AttributeError being raised.

Screenshots
N/A

Additional context

Traceback (most recent call last):
  File ".../artifact-passing.py", line 54, in <module>
    workflow_instance.submit(argo_api, "argo", parameters={"test_param": "some_value"})
  File ".../venv/lib/python3.8/site-packages/argo/workflows/dsl/_workflow.py", line 418, in submit
    self.spec.arguments.parameters = new_parameters
AttributeError: 'NoneType' object has no attribute 'parameters'
$ python
Python 3.8.1 (default, Feb 11 2020, 16:39:15) 
[GCC 7.4.0] on linux

It seems to be possible to manually workaround this as so:

        workflow_instance.spec._arguments = argo.workflows.client.V1alpha1Arguments()
        workflow_instance.spec._arguments.parameters = []

Development plan and contribution

Description
I have built our ML infrastructure with airflow and kubernetes. As a centralized scheduler, airflow has a few scenarios where airflow does not work well. For the next iteration of our ML workflow and scheduler, I'd like to invest in cloud-native workflow engine (basically argo). I'm actively exploring with this library and am wondering about the longer term development plan?

Do you take external contribution at the moment? (I suspect not at the moment, but would like to know when you would call it a V1 and go from there?)

Additional context

Host as part of Argo Labs

Hi @CermakM,

People are very interested in your SDK. Would you be interested in moving into Argo Labs? This will give it more "official" status, encourage other to contribute.

Alex

AttributeError: 'dependencies' object has no attribute 'model'

Describe the bug


Traceback (most recent call last):
--
  | File "/opt/app-root/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 418, in start
  | resp = await task
  | File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 82, in wrapper
  | return await wrapped_function(request, github_app=github_app)
  | File "/opt/app-root/lib/python3.7/site-packages/octomachinery/app/routing/webhooks_dispatcher.py", line 120, in route_github_webhook_event
  | await dispatch_event(event)
  | File "/opt/app-root/lib/python3.7/site-packages/gidgethub/routing.py", line 80, in dispatch
  | await callback(event, *args, **kwargs)
  | File "app.py", line 138, in on_pr_open_or_sync
  | installation=installation,
  | File "app.py", line 168, in _submit_thamos_workflow
  | wf = ThamosAdviseCheckRun()
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 233, in __init__
  | self.compile()
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 376, in compile
  | self.spec: V1alpha1WorkflowSpec = _compile(self.spec)
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
  | return list(map(_compile, obj))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 368, in _compile
  | return list(map(_compile, obj))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 371, in _compile
  | value: Any = _compile(getattr(obj, attr))
  | File "/opt/app-root/lib/python3.7/site-packages/argo/workflows/sdk/_workflow.py", line 343, in _compile
  | if obj.model is not None:
  | AttributeError: 'dependencies' object has no attribute 'model'


Would you consider integration with the component format of Kubeflow Pipelines?

Kubeflow Pipelines is built on top of Argo, but brings additional features like Python-based SDK for component and pipeline authoring.

One important feature of the KFP SDK is the concept of reusable components.
The component author can create and share their component once and then many pipeline authors can load and use those components. The format for components is declarative and language-independent. They look a bit similar to Argo's templates, but are smaller, easier to write and have several features that make writing components easier. The component library also provides ways to create components from user-provided python functions (somewhat similar idea to closure) or Airflow operators. There is even a way to create sub-DAG components based on python functions describing the workflow.

KFP components are a popular feature and I've seen several hundreds of them in the public GitHub alone. Even big companies are creating their custom components based on the format.

Some examples that demonstrate the usage of the component library: Creating components from command-line programs Data passing in python components

Although currently the component library is part of the KFP SDK, it's essentially a standalone python module that does not have any dependency on the rest of KFP SDK. If you're interested, we can think of extracting it as a separate library that can be imported without any other code. The component library has well-defined extension points that allow integrating and bridging it with any other orchestration DSL library (KFP, Tekton, TFX, Airflow, Argo-DSL).

What do you think about this integration proposal?

Missing kind and other metadata

Describe the bug
Using the DagDiamond example, on submit the server rejects based on missing kind for the template.

Requirement already satisfied: argo-workflows-dsl in ./venv/lib/python3.7/site-packages (0.1.0rc0)

To Reproduce
Steps to reproduce the behavior:

  1. standard Dagdiamond
  2. Submit it

Screenshots

class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container
(Pdb) dd
{'metadata': {'generate_name': 'dag-diamond-', 'name': 'dag-diamond'},
 'spec': {'entrypoint': 'main',
          'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'A'}]},
                                            'name': 'A',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'B'}]},
                                            'dependencies': ['A'],
                                            'name': 'B',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'C'}]},
                                            'dependencies': ['A'],
                                            'name': 'C',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'D'}]},
                                            'dependencies': ['B', 'C'],
                                            'name': 'D',
                                            'template': 'echo'}]},
                         'name': 'main'},
                        {'container': {'command': ['echo',
                                                   '{{inputs.parameters.message}}'],
                                       'image': 'alpine:3.7',
                                       'name': 'echo'},
                         'inputs': {'parameters': [{'name': 'message'}]},
                         'name': 'echo'}]},
 'status': {}}
(Pdb) dd.submit(client=service.client, namespace="argo")
*** argo.workflows.client.rest.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json', 'Date': 'Sat, 21 Mar 2020 08:25:47 GMT', 'Content-Length': '1334'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Workflow in version \"v1alpha1\" cannot be handled as a Workflow: unmarshalerDecoder: Object 'Kind' is missing in '{\"metadata\": {\"generateName\": \"dag-diamond-\", \"name\": \"dag-diamond\"}, \"spec\": {\"entrypoint\": \"main\", \"templates\": [{\"dag\": {\"tasks\": [{\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"A\"}]}, \"name\": \"A\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"B\"}]}, \"dependencies\": [\"A\"], \"name\": \"B\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"C\"}]}, \"dependencies\": [\"A\"], \"name\": \"C\", \"template\": \"echo\"}, {\"arguments\": {\"parameters\": [{\"name\": \"message\", \"value\": \"D\"}]}, \"dependencies\": [\"B\", \"C\"], \"name\": \"D\", \"template\": \"echo\"}]}, \"name\": \"main\"}, {\"container\": {\"command\": [\"echo\", \"{{inputs.parameters.message}}\"], \"image\": \"alpine:3.7\", \"name\": \"echo\"}, \"inputs\": {\"parameters\": [{\"name\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}', error found in #10 byte of ...|atus\": {}}|..., bigger context ...|e\": \"message\"}]}, \"name\": \"echo\"}]}, \"status\": {}}|...","reason":"BadRequest","code":400}

Additional context
Add any other context about the problem here.

how to use V1Volume, V1Toleration etc?

Description
I am trying to convert one of our workflow yamls into DSL, which uses few things like volumeClaimTemplates, volumes, volumeMounts, tolerations etc. I can't find any examples for that which explains how can I pass all those configs into V1Container?, any example with DSL would be really helpful.

Dynamic Template & Task generation

Description
Thank you for awesome package.
I'm trying to generate task and template dynamically means, i need to loop through a array and generate template and task respectively.
Is it possible to generate now?

Additional context

submit failed, prompt "argo.workflows.client.rest.ApiException: (400)"

Describe the bug
i tried submit argo workflow use python which is "hello-word" dsl, but failed Could help me have a look?
python code:

from argo.workflows.dsl import Workflow
from argo.workflows.dsl import template
import yaml

from argo.workflows.dsl.templates import V1Container

class HelloWorld(Workflow):

entrypoint = "whalesay"

@template
def whalesay(self) -> V1Container:
    container = V1Container(
        image="docker/whalesay:latest",
        name="whalesay",
        command=["cowsay"],
        args=["hello world"]
    )

    return container

wf=HelloWorld()
print(wf)

from argo.workflows.client import V1alpha1Api
from argo.workflows.config import load_kube_config

load_kube_config() # loads local configuration from ~/.kube/config
v1alpha1 = V1alpha1Api()
wfs = v1alpha1.list_namespaced_workflows(namespace="default")
print(wfs)
#v1alpha1.create_namespaced_workflow("default", wf)
wf.submit(client=V1alpha1Api(), namespace="default")

i can get wfs by client and print, but submit failed throw 400 exception
Screenshots
image
image

[Discussion] Dynamic task generation

Description
I have been working on a proposal to generate tasks dynamically for argo dsl.

I got to a point where I think I should get your feedback about whether it is a feasible design.

Currently, this is a work in progress, and I am still picking up the metaprogramming mechanics implemented in argo dsl. I wouldn't be surprised if my design isn't actually feasible and needed to be abandoned, but I did my best in terms of standing from a data science perspective that have authored airflow pipelines.

The proposal is here: https://gist.github.com/binarycrayon/75af90c1cdf660333f9903cd5822245d

Please let me know what you think!

Additional context
This has originally sent as email, and forwarded here per Marek's suggestion. I realize that argo also have a built in with_items or with_params https://github.com/argoproj-labs/argo-client-python/search?q=with_items&unscoped_q=with_items

Technically I'm no longer blocked for what I need to do, but I still want to get feedback

Workflow metadata name parsed from class name prevents generated name from working

Description
Is it a design decision that any workflow class would have it's name attr set as "class-name" by default? While the metadata name is set, generated name will not take action.

e.g. class Helloworld(Workflow) would have 'hello-world' set as metadata name by default and used as pod name explicitly in k8s. I like the idea of leaving the name empty (or optional?) and default to generated-names, so I can submit the same workflow multiple times (since we cannot have duplicate pod names in k8s, at least while using the same node )

this is no a show stopper for me, I can always call wf.name = '', but current behaviour seems counter-intuitive.

Additional context
https://github.com/CermakM/argo-python-dsl/blob/master/argo/workflows/dsl/_workflow.py#L67

new install from master - as require the full body posted on submit

Describe the bug

On use trying to use argo 2.6.3 which requires the kind and version and various other metadata which should always be sent (because crons and templates are important) am getting

% python manage.py ps_k8s_post_workflow                                                                                                                                                                                             rosscdh@s
Traceback (most recent call last):
  File "manage.py", line 25, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 328, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/django/core/management/base.py", line 369, in execute
    output = self.handle(*args, **options)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 173, in handle
    wf = service.process()
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in process
    self.workflows = [workflow_id for workflow_id in self.send()]
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 159, in <listcomp>
    self.workflows = [workflow_id for workflow_id in self.send()]
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/backend/pagesnap/apps/default/management/commands/ps_k8s_post_workflow.py", line 156, in send
    yield service.submit(client=service.client, namespace="argo")
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/dsl/_workflow.py", line 430, in submit
    body = client.api_client.sanitize_for_serialization(self)
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in sanitize_for_serialization
    for key, val in six.iteritems(obj_dict)}
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 242, in <dictcomp>
    for key, val in six.iteritems(obj_dict)}
  File "/Users/rosscdh/p/PageSnap/v1/pagesnap/venv/lib/python3.7/site-packages/argo/workflows/client/api_client.py", line 238, in sanitize_for_serialization
    for attr, _ in six.iteritems(obj.openapi_types)
AttributeError: 'V1ObjectMeta' object has no attribute 'openapi_types'

seems an invalid type reference?

Versions

-e git+https://github.com/CermakM/argo-python-dsl@6ee661685ff87abd594714a3e8a0493eac4960b6#egg=argo_workflows_dsl

and

argo-workflows==3.2.0

To Reproduce
in order to get the updated post whole kubernetes resource to argo 2.6.3

pip install -e 'git+https://github.com/CermakM/argo-python-dsl#egg=argo-workflows-dsl'

Expected behaviour
The library works

Screenshots

Additional context
Add any other context about the problem here.

cannot import name 'models' from 'argo.workflows'

Describe the bug
I am on master branch and tried following simple HelloWorld example.
when I try to from argo.workflows.dsl import Workflow
I get

In [2]: from argo.workflows.dsl import Workflow
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-2-11eebaacd24f> in <module>
----> 1 from argo.workflows.dsl import Workflow

~/Workspace/argo-python-dsl/argo/workflows/dsl/__init__.py in <module>
     15
     16 # modules
---> 17 from . import tasks
     18 from . import templates
     19

~/Workspace/argo-python-dsl/argo/workflows/dsl/tasks.py in <module>
     23 )
     24
---> 25 from ._arguments import artifact
     26 from ._arguments import parameter
     27 from ._base import Prop

~/Workspace/argo-python-dsl/argo/workflows/dsl/_arguments.py in <module>
      9 )
     10
---> 11 from ._base import Prop
     12
     13 __all__ = ["artifact", "parameter", "V1alpha1Artifact", "V1alpha1Parameter"]

~/Workspace/argo-python-dsl/argo/workflows/dsl/_base.py in <module>
     14 from typing import Union
     15
---> 16 from argo.workflows import models
     17
     18 T = TypeVar("T")

ImportError: cannot import name 'models' from 'argo.workflows' (./argo/workflows/__init__.py)

Python Version : Python 3.7.5

Installed argo-workflows using pip install -e "git+git://github.com/CermakM/argo-client-python@argo/v2.5.0#egg=argo-workflows"

To Reproduce
Steps to reproduce the behavior:

  1. Go to 'https://github.com/CermakM/argo-python-dsl#getting-started and follow the instruction on master branch code.

Screenshots
image

pip list | grep -i argo
argo-models              2.2.1a0
argo-workflows           3.0.0rc0

Functional API?

Hey there!

Love the project. I use Argo every day at work and a nice Python API would make many things a lot easier. I'm wondering if you have plans for a more functional type of API. For example,
something like Prefect

If you're open, I would be willing write up a PoC and contribute it.

Nested DAGs

Description

I have a scenario where I would like to encode a nested DAG using the DSL. More specifically, I have a task that uses a with_items decorator that, in turn, uses a template that is supposed to launch multiple processes that perform what I need them to perform. Is it syntactically possible to write a task that returns a template, which ultimately returns a closure for the parameter combination? In pseucode:

@task
@with_items(['a', 'b', 'c'])
def A(...params) -> template:
   return B(...params)

@template # (??? is this the right choice?)
@with_items(['d', 'e', 'f'])
@inputs.param()  # (??? a, b, and c from step A)
def B(...params) -> script template
   return C(...all params)

@closure
@inputs.param(first)
@inputs.param(second)
def C(...params) -> script template:
  print(first / second)  # results in: ad, ae, af, bd, be, bf, cd, ce, cf

Final DAG:

           *
      /    |    \
     a     b      c
    /|\   /|\    /|\
    def   def    def
     \     |    /
ad, ae, af, bd, be, bf, cd, ce, cf (final steps)
          *          

Is this possible with the current version of the DSL? If so, what could be a way to structure the workflow?

Additional context

Argo: 3.0.1
Python: 3.7
K8S: 1.18 (GKE)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.