Code Monkey home page Code Monkey logo

dag-factory's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dag-factory's Issues

Nested Subgroups

Hello,

I have tried to specify a TaskGroup within a TaskGroup, but it seems that the "make_task_group" method does not support that.

Have I overlooked something, or is it not supported?

Best regards from Austria,
Andreas

Support lambda in python_callable

Right now we need to use file and name for python callable. It should be able to take lambda as well as shown below:

task_2:
      operator: airflow.sensors.http_sensor.HttpSensor
      http_conn_id: 'test-http'
      method: 'GET'
      response_check_lambda: 'lambda response: "ok" in reponse.text'
      dependencies: [task_1]

I have been able to make it work and will raise a pull request for it.

Help Required: XComs in YAML file

I am trying to create dynamic DAGs using YAML template and dag-factory. With a small example I am able to access MSSQL database and insert data. As a next step I want to perform EL(T)/E(T)L and from airflow side I can perform the extract from DB1 and load to DB(2) with XComs (for small datasets or S3 as XCom backend) . But how would dag-factory handle the XComs or how can I pass the extract data to the load step (keyword in YAML). Apologize if I missed something obvious in the documentation.

Thank you.

DAG import broken in 0.9.0

The latest release is not working for our DAGs, with 0.9.0 we get this error on DAGs that work fine with 0.8.0

Broken DAG: [/usr/local/airflow/dags/dcetl/dcetl_dark_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagbuilder.py", line 118, in get_dag_params
    dag_params["default_args"]["on_success_callback"]
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 28, in import_string
    module_path, class_name = dotted_path.rsplit('.', 1)
AttributeError: 'function' object has no attribute 'rsplit'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 136, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
  File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 103, in build_dags
    ) from err
Exception: Failed to generate dag dcetl_dag_dark. verify config is correct

Looks like the issue is caused by how callbacks in our customizations are handled - with 0.8.0 we do this:

        config = dagfactory.DagFactory._load_config(config_filepath)
        config["default_args"]["on_success_callback"] = PagerDutyHook().on_success_callback
        config["default_args"]["on_failure_callback"] = PagerDutyHook().on_failure_callback

With 0.9.0 this means the PagerDutyHook is passed to the import_string method, so dotted_path in import_string isn't a string it is

<bound method PagerDutyHook.on_success_callback of <pagerduty.hooks.pagerduty.PagerDutyHook object at 0x7f2293776750>>

Unable to pass date macros

Macros like {{ ds }}, {{ prev_ds }} which are very critical for backfill do not work. Is there a way to do it?

Support for python callable response_check in HttpSensor

Need to be able to create a callable for response_check as below:

    task_3:
      operator: airflow.providers.http.sensors.http.HttpSensor
      task_id: 'http_sensor'
      http_conn_id: 'test-http'
      endpoint: ''
      method: 'GET'
      headers: 
        header1: 'value1'
      response_check_name: check_sensor
      response_check_file: /opt/airflow/dags/repo/dags/example1/http_conn.py
      poke_interval: 5        
      dependencies: [task_1]

sla_miss_callback

sla_miss_callback functionality should be like below

    if utils.check_dict_key(
        dag_params, "on_success_callback_name"
    ) and utils.check_dict_key(dag_params, "on_success_callback_file"):
        dag_params["on_success_callback"]: Callable = utils.get_python_callable(
            dag_params["on_success_callback_name"],
            dag_params["on_success_callback_file"],
        )
        dag_params["default_args"]["on_success_callback"]: Callable = utils.get_python_callable(
            dag_params["on_success_callback_name"],
            dag_params["on_success_callback_file"],
        )

execution_timeout should have converted to timedelta

Hi. I need to use default execution_timeout parameter. But task fails because dag factory isn't convert it to timedelta. I check your code to see how you handle retry_delay and I saw below code. Same convertion should be also done for execution_timeout. Maybe better solution would be to convert all parameters to timedelta which ends with '_sec'

    if utils.check_dict_key(dag_params["default_args"], "retry_delay_sec"):
       dag_params["default_args"]["retry_delay"]: timedelta = timedelta(
          seconds=dag_params["default_args"]["retry_delay_sec"]
       )
       del dag_params["default_args"]["retry_delay_sec"]

Read multiple YAML files in the same python file

Hello,

This is not really an issue but more a question, I am wondering if you would recommend to have the following : instead of having one python file that would read one yaml file and construct a DAG, you construct multiple DAGs in the same python file by reading several YAML files.

From what I saw, the bottleneck would be at the worker level because it also has to parse the DAG to understand what it needs to do, and thus in this situation the worker would parse all the YAML files while it only needed to parse one. I did not find out if there is a way to stipulate that information to the airflow worker to not having it to parse something it doesn't need.

Regards
Ferdinand

How to delete a DAG?

A DAG created through this tool is not able to be deleted (due to the file path) even if I comment the part that is used by the DAG.

Do you have any plans to include that ?

Thanks!

Airflow 2.0.0 compability

Hi @ajbosco
Did you have chance to try dagfactory on airflow 2.0? I wonder if I will have problem if I upgrade to 2.0? If it is not compatible, are you planning to release a version for airflow 2.0?

Unexpected keyword argument 'tags'

When trying the example in this repo as-is on GCP's Cloud Composer with Airflow 1.10.6, it runs into this error when trying to parse the DAG:

verify config is correct. err:__init__() got an unexpected keyword argument 'tags'

When I downgrade dag-factory to 0.4.2, it works as advertised.

Airflow 2.2.0 compability

I tried to upgrade airflow to 2.2.0 and got below error.

Broken DAG: [/opt/airflow/dags/****.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 373, in get
    return self._get_option_from_default_config(section, key, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 383, in _get_option_from_default_config
    raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
airflow.exceptions.AirflowConfigException: section/key [core/dag_concurrency] not found in config

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 137, in clean_dags
    dags: Dict[str, Any] = self.build_dags()
  File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 102, in build_dags
    raise Exception(
Exception: Failed to generate dag ****. verify config is correct

I guess it is because of this change.

Add json schema validation for yaml files

Hi there 👋

When I write code I always use a linter, my IDE tells me I'm about to make a mess by not importing certain library or not using the right type.

This does not apply to config files, They're super nice because config "always compiles" 😉 . But the code running those config files not always like it. So to have a little bit more control I try to implement "schemas" on those config files. They're looser than "clases", also language independent, easier to maintain and to run.

It would be nice to have something like

import yaml
from jsonschema import validate

if __name__ == '__main__':
    schema = """
        type: object
        properties:
          default_args:
            type: object
            properties:
              start_date:
                type: string
          schedule_interval:
            type: string
          catchup:
            type: boolean
          tasks:
            type: array
            items:
              type: object
              properties:
                operator:
                  type: string
                dependencies:
                  type: array
                  items:
                    type: string
    """
    validate(
        instance=yaml.safe_load(open('path/to/my_dag.yml')),
        schema=yaml.safe_load(schema)
    )

But this schema is the one I generated, it doesn't distinguish between airflow versions, it's not up to date with the library. It would be nice to have a check like this. Pycharm picks up those schemas to validate things like .gitlab-ci.yaml. So we could as well be writing DAGs the "safest" way with free linting.

Also a though. But this way the codebase could ease up a little bit on the yaml validation. Reducing the core codebase makes it easier to read and maintain.

Regardless of what you think of this porposal, thanks for your work, and for open sourcing this library. It rocks 🚀 !

CWL ?

Does DAG Factory follow common workflow language or CWL in terms of it's yaml syntax?

How to access Airflow variables and parameters passed via REST API

I have a use case where I am accessing Airflow's variable using Variable.get(). How can I achieve this in yaml file.

Also, how can I access parameters (params.app_name, params.namespace etc) which I am passing via REST API as follows:

{ "conf": { "app_name":"app namet", "region":"", "namespace": "", "default_path": "", } }

env_variables = Variable.get("env_variables", deserialize_json=True)
fetch_yaml = BashOperator(
        bash_command="export {{var.json.env_variables['kube_config'] && kubectl get sparkapplications {{ params.app_name}} -n {{params.namespace}}",
        task_id="fetch_yaml"
    )

support dynamically extract dag params in dagbuilder

Not sure why in the dag builder the config param parsing is a bit manual. Is it possible to get airflow DAG attribute and fill in if specified?

The specific use case is that I want to use the field access_control in the DAG object.

Can we separate out generating DAG and setting it into Airflow's global?

Over here generating DAG & setting into Airflow's global is tied up together in the single method.

def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
Generates DAGs from YAML config
:param globals: The globals() from the file used to generate DAGs. The dag_id
must be passed into globals() for Airflow to import
"""
dag_configs: Dict[str, Dict[str, Any]] = self.get_dag_configs()
default_config: Dict[str, Any] = self.get_default_config()
for dag_name, dag_config in dag_configs.items():
dag_builder: DagBuilder = DagBuilder(
dag_name=dag_name, dag_config=dag_config, default_config=default_config
)
try:
dag: Dict[str, Union[str, DAG]] = dag_builder.build()
except Exception as err:
raise f"Failed to generate dag {dag_name}. verify config is correct" from err
globals[dag["dag_id"]]: DAG = dag["dag"]

If I want to save the generated DAG into DB for versioning or serialise & send it over network, this is not possible with this method currently.

I suggest that, we return the generated DAG and we create another method set_dag() which will set the dag in Airflow's global namespace.

start_date and end_date get their time component removed

While using the dag-factory to create dags on Apache airflow, I noticed a very strange behavior. Suppose while creating a dag, input start_date is "2020-04-14T16:20:30.000Z". On airflow UI, it is shown as "2020-04-14T00:00:00.000Z" (Notice how the time component is removed).

I did a little bit of searching and I found out this issue to be due to this piece of code in utils.py:

 if isinstance(date_value, date):
    return datetime.combine(date=date_value, time=datetime.min.time()).replace(
        tzinfo=local_tz
    ) 
if isinstance(date_value, datetime):
    return date_value.replace(tzinfo=local_tz)

If I pass a datetime.datetime object as date_value, then it will match with the first if and that will remove the time component of my input.
Can you suggest a workaround if I am doing something wrong?

Deserialise arbitrary types in python

Most part of params that the DAG and the Operators and Sensors underneath take are basic/raw types (string, int, bool, float) and basic objects (lists, dicts). But from time to time we require some python object. For example:

An example for the ExternalTaskSensor

example_external_task_sensor:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    external_task_sensor:
      operator: airflow.sensors.external_task.ExternalTaskSensor
      external_dag_id: example_upstream_dependency_dag
      external_task_id: example_upstream_dependency_task
      execution_delta: !!python/object/apply:datetime.timedelta [0, 300]

And also an example for deserialising a reference to the callable straight from the yaml, lets suppose we are instantiating this yaml from a file main_dag_file.py that also contains a python function we want to call (so that it belongs to the PYTHONPATH):

# main_dag_file.py
def add_two_numbers(first_operator: int, second_operator: int) -> int:
  return first_operator + second_operator
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: airflow.operators.python.PythonOperator
      python_callable: !!python/name:main_dag_file.add_two_numbers
      op_wkargs:
        first_operator: 3
        second_operator: 2

In addition sometimes it's useful to both:

  • Have your DAG as a config file with it's jinja templating ready for airflow.
  • Have a render engine on top to be able to make references, modify strings, pass function return as a param for an operator.
# main_dag_file.py
def decide_executor() -> str:
  environment = os.getenv('EXECUTION_ENVIRONMENT')
  if environment == 'prod':
    executor = 'kubernetes.KubernetesPodOperator'
  else:
    executor = 'bash.BashOperator'
  return 'airflow.operators.' + executor
# pass environment variables to the function
example_call_python_code:
  default_args:
    start_date: 2021-09-01
  schedule_interval: @daily
  tasks:
    example_python_task:
      operator: !!python/object/main_dag_file.decide_executor()
      some_param: {{ var.value.my_variable }}

This can also be solved by being able to add python elements to the yaml file. This is a fine line. Config files should be language agnostic, I do know, that's why I open this Issue for the community to have their call too.

To implement this we should use yaml.load instead of the yaml.load_safe. As we are the owners of the code there's no risk on doing so. This feature (load_safe) is in place to prevent injection attacks.

Also, this feature will render unnecessary the current way of attaching the callable to a PythonOperator, and it complies with the exact operator params.

What are your thoughts?

Time taken to create dag in Airflow from yaml file

Nameste @ajbosco Thank you for this great library.

I got a question.
I am wondering if the time taken by dag-factory to create a dag on airflow is configurable? I am seeing a time gap of about 3 minutes between I placing a yaml file of tasks (and the python configuration) and seeing the dag on Airflow UI.

Thanks again.

dag-factory PythonOperator

Hi Adam,
I try to use the pythonoperator with the dagfactory, I tried to define a function within the python file and call this function as python_callable and get always the following error message:

make sure config is properly populated. err:Failed to create <class 'airflow.operators.python_operator.PythonOperator'> task. err: python_callableparam must be callable

As long as I write the task directly within the python file its no problem to call the function.
Am I doing something wrong or is it not possible to use the pythonoperator this way?

DAG file:

from airflow import DAG
import dagfactory
dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
dag_factory.generate_dags(globals())

# [START howto_operator_python]
def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

YAML config:

Python_Test:
  default_args:
    owner: 'user'
    start_date: 2019-05-28
  description: 'this is an sample dag which runs every day'
  tasks:
    test_1:
      operator: airflow.operators.python_operator.PythonOperator
      task_id: 'pythonoperator_Testtask'
      python_callable: print_context

This module is deprecated. Please use `kubernetes.client.models.V1Volume`

Symptom

~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.                                                                            
~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.    

Traceback

  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/__init__.py", line 2, in <module>
    from .dagfactory import DagFactory                                                                                                                         
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagfactory.py", line 8, in <module>
    from dagfactory.dagbuilder import DagBuilder                                                                                                               
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagbuilder.py", line 10, in <module>
    from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator             
  File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 25, in <module>
    from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator  # noqa

Dag factory as a DAG

Hey there,

Thank you by the library, it is awesome. I'm trying to use it though and didn't manage to. My case is the following:

I want to offer a way to users create their own dags using a small UI builder (no code solution). I already have that working, now I'm trying to configure the dag-factory to create those DAGs. Therefore, I can't add the script in the dags folder as described in the readme. Instead I created the following dag:

from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

from dagfactory import DagFactory

default_args = {
    'owner': 'querylayer',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

def create_dag(**kwargs):
    dag_factory = DagFactory(config=kwargs['dag_run'].conf)

    dag_factory.clean_dags(globals())
    dag_factory.generate_dags(globals())
    print(globals().get('dag_name'))

with DAG('create_workflow', default_args=default_args, schedule_interval=None, start_date=days_ago(0), catchup=False) as dag:
    create_workflow = PythonOperator(
        task_id='create_workflow',
        provide_context=True,
        python_callable=create_dag,
        dag=dag
    )

    create_workflow

The idea is that we pass the config as JSON to this DAG it will create the dag for you. So far so good, I tried running this DAG with the config below and it runs successfully:

{
    "dag_name": {
      "dag_id": "dag_name",
      "schedule_interval": "None",
      "description": "some description",
      "doc_md": "# Title",
      "catchup": false,
      "concurrence": 2,
      "max_active_runs": 2,
      "task_groups": {},
      "default_args": {
          "start_date": "2021-09-28"
      },
      "tasks": {
        "task_name": {
          "operator": "airflow.operators.bash_operator.BashOperator",
          "bash_command": "echo 1"
        },
  
        "task_name2": {
          "operator": "airflow.operators.bash_operator.BashOperator",
          "bash_command": "echo 2",
          "dependencies": ["task_name"]
        }
      }
    }
  }

Below I put the output of the DAG execution, you can see that the line print(globals().get('dag_name')) actually prints the dag created:

*** Reading local file: /opt/airflow/logs/create_workflow/create_workflow/2021-09-28T18:46:06.674663+00:00/3.log
[2021-09-28 18:51:49,399] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,422] {taskinstance.py:1068} INFO - Starting attempt 3 of 5
[2021-09-28 18:51:49,424] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,450] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): create_workflow> on 2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,460] {standard_task_runner.py:52} INFO - Started process 58 to run task
[2021-09-28 18:51:49,461] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'create_workflow', 'create_workflow', '2021-09-28T18:46:06.674663+00:00', '--job-id', '225', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/create_workflow.py', '--cfg-path', '/tmp/tmp1z29zp0e', '--error-file', '/tmp/tmp29nl9faf']
[2021-09-28 18:51:49,461] {standard_task_runner.py:77} INFO - Job 225: Subtask create_workflow
[2021-09-28 18:51:49,496] {logging_mixin.py:104} INFO - Running <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [running]> on host c1e986c90b97
[2021-09-28 18:51:49,538] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=querylayer
AIRFLOW_CTX_DAG_ID=create_workflow
AIRFLOW_CTX_TASK_ID=create_workflow
AIRFLOW_CTX_EXECUTION_DATE=2021-09-28T18:46:06.674663+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,540] {logging_mixin.py:104} INFO - <DAG: dag_name>
[2021-09-28 18:51:49,540] {python.py:151} INFO - Done. Returned value was: None
[2021-09-28 18:51:49,558] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=create_workflow, task_id=create_workflow, execution_date=20210928T184606, start_date=20210928T185149, end_date=20210928T185149
[2021-09-28 18:51:49,599] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-09-28 18:51:49,633] {local_task_job.py:151} INFO - Task exited with return code 0

But even tho everything runs successfully, the DAG isn't displayed in the dag list, we can see that using the command line to list all dags:

default@c1e986c90b97:/opt/airflow$ airflow dags list
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
dag_id                   | filepath                    | owner      | paused
=========================+=============================+============+=======
create_workflow          | create_workflow.py          | querylayer | False 

Am I missing something? Is this approach viable or there is a better way to do so?
Another question: are those DAGs stored in the DB somehow or it is created only in memory? I saw that you register that in the globals variable, but this means if somehow airflow restart it would not start with previously created dags, am I right?

Getting rid off long imports

Could we get rid off this long import?
airflow.operators.bash_operator.BashOperator

Would be greate to have
BashOperator
instead

not able to import dagfacotry

Hi ,
first, amazing project,
hope it still running:)

I'm facing issue importing the dag-facotry package,
see below details

$ airflow version
2.1.2
$ pip list|grep dag
dag-factory 0.9.1
$ python -V
Python 3.6.13
$ python
Python 3.6.13 (default, May 12 2021, 16:48:24)
[GCC 8.3.0] on linux

>>> from airflow import DAG
>>> import dagfactory
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
>>>

Does this require passing the YAML to airflow?

Adam,

Great looking library, how do you deploy the YAML to airflow with the DAG factory?

I'm just diving into Airflow now, so any guidance is helpful. Airflow distributed the DAG itself, but every node will need to re-factory the DAG, so the YAML has to be available in the airflow image?

SubDag Support

Hey!

I was testing some use cases to use it at our company, and was wondering if there is any support for SubDag operator?
I played around a bit and couldn't make it work.

Congrats on the great lib.

Cheers

template_searchpath seems to not work in YAML file

Hi, first of all, nice job on this project.

I'm trying to configure a DAG pointing to scripts in a path different from the DAG's path so, I'm setting the template_searchpath in YAML config to point to this location but I'm receiving the error: jinja2.exceptions.TemplateNotFound: /opt/airflow/scripts/query.sql so I guess the template_searchpath is not working.
If I place the query.sql in the DAG's path, it works.

Here is my YAML

default:
  default_args:
    owner: "dag-factory"
    start_date: 2 days
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "graph"
  orientation: "LR"
  schedule_interval: "0 1 * * *"
  catchup: False
  tags: ['dag-factory']
  template_searchpath: ["/opt/airflow/scripts"]

example_sql_exec_query:
  description: "DAG to run SQL query"
  tasks:
    begin:
      operator: airflow.operators.dummy_operator.DummyOperator
    exec_sql:
      operator: airflow.operators.postgres_operator.PostgresOperator
      postgres_conn_id: "postgres"
      sql: /opt/airflow/scripts/query.sql # it doesn't work.
      params: {"columns": "dag_id, owners", "table": "public.dag", "limit": 10}
      dependencies: [begin]

Does anybody with the same issue?

dag-factory==0.4.0 breaks our use of dag-factory with an airflow templating error

relevant packages:

apache-airflow[crypto]==1.10.2
dag-factory==0.4.0

dag-factory yaml:

registry_of_metrics:
  default_args:
    provide_context: True
    depends_on_past: True
    retries: 15
    start_date: 2018-10-01
  schedule_interval: '0 9 1 * *'
  description: 'registry of metric dag'

  tasks:
    # Paying dealers metric
    staging_paying_dealers:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'paying_dealers'
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.staging_paying_dealers.StagingPayingDealersReport
        schema: 'SCORECARD_STAGING'
        table: 'PAYING_DEALERS'
        schedule: 'monthly'
      template_arguments:
        date: 'next_execution_date'
    paying_dealers:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'paying_dealers'
      dependencies: [staging_paying_dealers]
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.paying_dealers.PayingDealersReport
        schema: 'SCORECARD'
        table: 'METRIC_MONTH_COUNTRY_DATA'
        schedule: 'monthly'
        metric_id: 3
      template_arguments:
        date: 'next_execution_date'
    # Session uniques metric
    session_uniques:
      operator: operators.DockerRunOperator
      entrypoint: 'bin/generate_report.py'
      task_id: 'session_uniques'
      arguments:
        date_name: 'as_of_date'
        sql_class: lib.models.reports.session_uniques.SessionUniquesReport
        schema: 'SCORECARD'
        table: 'METRIC_MONTH_COUNTRY_DATA'
        schedule: 'monthly'
        metric_id: 9
      template_arguments:
        date: 'next_execution_date'

Traceback (most recent call last):
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1982, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1517, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/_compat.py", line 33, in reraise
    raise value
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1598, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    return func(*args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/views.py", line 2224, in index
    auto_complete_data=auto_complete_data)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 308, in render
    return render_template(template, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 134, in render_template
    context, ctx.app)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 116, in _render
    rv = template.render(context)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/asyncsupport.py", line 76, in render
    return original_render(self, *args, **kwargs)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 1008, in render
    return self.environment.handle_exception(exc_info, True)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 780, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/_compat.py", line 37, in reraise
    raise value.with_traceback(tb)
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code
    {% extends "airflow/master.html" %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code
    {% extends "admin/master.html" %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 18, in top-level template code
    {% extends 'admin/base.html' %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 37, in top-level template code
    {% block page_body %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 107, in block "page_body"
    {% block body %}
  File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 88, in block "body"
    <a href="{{ url_for('airflow.'+dag.default_view, dag_id=dag.dag_id) }}" title="{{ dag.description }}">
TypeError: can only concatenate str (not "NoneType") to str

Call user defined functions when building DAGs

Hi @ajbosco,

We have defined some Python functions that return TaskGroup. We call these functions to create DAGs.

In Python, a function like make_task_group_X could be used like this:

with DAG(...) as dag:
       g1 = make_task_group_a(args...)
       g2 = make_task_group_b(args...)
       g3 = make_task_group_c(args...)
start >> g1 >> g2 >> g3 >> end

With dag-factory, is it possible to call user defined functions to build up a DAG?

Dependabot can't resolve your Python dependency files

Dependabot can't resolve your Python dependency files.

As a result, Dependabot couldn't update your dependencies.

The error Dependabot encountered was:

ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.
[pipenv.exceptions.ResolutionFailure]:       req_dir=requirements_dir
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 726, in resolve_deps
[pipenv.exceptions.ResolutionFailure]:       req_dir=req_dir,
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 480, in actually_resolve_deps
[pipenv.exceptions.ResolutionFailure]:       resolved_tree = resolver.resolve()
[pipenv.exceptions.ResolutionFailure]:   File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 395, in resolve
[pipenv.exceptions.ResolutionFailure]:       raise ResolutionFailure(message=str(e))
[pipenv.exceptions.ResolutionFailure]:       pipenv.exceptions.ResolutionFailure: ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
[pipenv.exceptions.ResolutionFailure]:       Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
[pipenv.exceptions.ResolutionFailure]: Warning: Your dependencies could not be resolved. You likely have a mismatch in your sub-dependencies.
  First try clearing your dependency cache with $ pipenv lock --clear, then try the original command again.
 Alternatively, you can use $ pipenv install --skip-lock to bypass this mechanism, then run $ pipenv graph to inspect the situation.
  Hint: try $ pipenv lock --pre if it is a pre-release dependency.
ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.

['Traceback (most recent call last):\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 501, in create_spinner\n    yield sp\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 649, in venv_resolve_deps\n    c = resolve(cmd, sp)\n', '  File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 539, in resolve\n    sys.exit(c.return_code)\n', 'SystemExit: 1\n']

If you think the above is an error on Dependabot's side please don't hesitate to get in touch - we'll do whatever we can to fix it.

You can mention @dependabot in the comments below to contact the Dependabot team.

Need help on Sparksubmitoperator

Hi,
Hope Sparksubmitoperator, hdfs sensor operator and many operators are not included in the dag-factory.

Could you please add that as well.

Also would be great If I get sample yaml file for sparksubmitoperator.

Best practices for deployment

Hey there,
this is an incredible extension to airflow. I'm very excited about it, but I'm question my self how to deploy dags properly (I'm far new to airflow). In my tests a change of the yaml file and push to airflow changes a running jobs instantly (and executes the new tasks, which I expect to start in the next run).

Could you share a best practices how to deploy the yaml files and maybe how to version the dags?

Thanks in advance.

Get the error "Broken DAG: [/usr/local/airflow/dags/example_dag_factory.py] No module named 'dagfactory'" in airflow UI

I followed the instructions in the README file to install dag-factory in a local airflow container, in which the airflow version is 1.10.12 and python version is 3.8.9.

I copied the files example_dag_factory.yml, example_dag_factory.py and print_hello.py in https://github.com/ajbosco/dag-factory/blob/master/examples into the folder /usr/local/airflow/dags in the container, expecting the DAGs defined in the yml file can be recognized. However, I see the following error in the airflow UI.

image

Not sure what I missed. Any helps are appreciated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.