ajbosco / dag-factory Goto Github PK
View Code? Open in Web Editor NEWDynamically generate Apache Airflow DAGs from YAML configuration files
License: MIT License
Dynamically generate Apache Airflow DAGs from YAML configuration files
License: MIT License
Hello,
I have tried to specify a TaskGroup within a TaskGroup, but it seems that the "make_task_group" method does not support that.
Have I overlooked something, or is it not supported?
Best regards from Austria,
Andreas
Right now we need to use file and name for python callable. It should be able to take lambda as well as shown below:
task_2:
operator: airflow.sensors.http_sensor.HttpSensor
http_conn_id: 'test-http'
method: 'GET'
response_check_lambda: 'lambda response: "ok" in reponse.text'
dependencies: [task_1]
I have been able to make it work and will raise a pull request for it.
I am trying to create dynamic DAGs using YAML template and dag-factory. With a small example I am able to access MSSQL database and insert data. As a next step I want to perform EL(T)/E(T)L and from airflow side I can perform the extract from DB1 and load to DB(2) with XComs (for small datasets or S3 as XCom backend) . But how would dag-factory handle the XComs or how can I pass the extract data to the load step (keyword in YAML). Apologize if I missed something obvious in the documentation.
Thank you.
The latest release is not working for our DAGs, with 0.9.0 we get this error on DAGs that work fine with 0.8.0
Broken DAG: [/usr/local/airflow/dags/dcetl/dcetl_dark_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dagfactory/dagbuilder.py", line 118, in get_dag_params
dag_params["default_args"]["on_success_callback"]
File "/usr/local/lib/python3.7/site-packages/airflow/utils/module_loading.py", line 28, in import_string
module_path, class_name = dotted_path.rsplit('.', 1)
AttributeError: 'function' object has no attribute 'rsplit'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 136, in clean_dags
dags: Dict[str, Any] = self.build_dags()
File "/usr/local/lib/python3.7/site-packages/dagfactory/dagfactory.py", line 103, in build_dags
) from err
Exception: Failed to generate dag dcetl_dag_dark. verify config is correct
Looks like the issue is caused by how callbacks in our customizations are handled - with 0.8.0 we do this:
config = dagfactory.DagFactory._load_config(config_filepath)
config["default_args"]["on_success_callback"] = PagerDutyHook().on_success_callback
config["default_args"]["on_failure_callback"] = PagerDutyHook().on_failure_callback
With 0.9.0 this means the PagerDutyHook is passed to the import_string method, so dotted_path in import_string isn't a string it is
<bound method PagerDutyHook.on_success_callback of <pagerduty.hooks.pagerduty.PagerDutyHook object at 0x7f2293776750>>
Macros like {{ ds }}, {{ prev_ds }} which are very critical for backfill do not work. Is there a way to do it?
Do you support SimpleHttpOperator ?
Need to be able to create a callable for response_check as below:
task_3:
operator: airflow.providers.http.sensors.http.HttpSensor
task_id: 'http_sensor'
http_conn_id: 'test-http'
endpoint: ''
method: 'GET'
headers:
header1: 'value1'
response_check_name: check_sensor
response_check_file: /opt/airflow/dags/repo/dags/example1/http_conn.py
poke_interval: 5
dependencies: [task_1]
sla_miss_callback functionality should be like below
if utils.check_dict_key(
dag_params, "on_success_callback_name"
) and utils.check_dict_key(dag_params, "on_success_callback_file"):
dag_params["on_success_callback"]: Callable = utils.get_python_callable(
dag_params["on_success_callback_name"],
dag_params["on_success_callback_file"],
)
dag_params["default_args"]["on_success_callback"]: Callable = utils.get_python_callable(
dag_params["on_success_callback_name"],
dag_params["on_success_callback_file"],
)
How to pass trigger_rule in yaml file?
Hi. I need to use default execution_timeout parameter. But task fails because dag factory isn't convert it to timedelta. I check your code to see how you handle retry_delay and I saw below code. Same convertion should be also done for execution_timeout. Maybe better solution would be to convert all parameters to timedelta which ends with '_sec'
if utils.check_dict_key(dag_params["default_args"], "retry_delay_sec"): dag_params["default_args"]["retry_delay"]: timedelta = timedelta( seconds=dag_params["default_args"]["retry_delay_sec"] ) del dag_params["default_args"]["retry_delay_sec"]
Hello,
This is not really an issue but more a question, I am wondering if you would recommend to have the following : instead of having one python file that would read one yaml file and construct a DAG, you construct multiple DAGs in the same python file by reading several YAML files.
From what I saw, the bottleneck would be at the worker level because it also has to parse the DAG to understand what it needs to do, and thus in this situation the worker would parse all the YAML files while it only needed to parse one. I did not find out if there is a way to stipulate that information to the airflow worker to not having it to parse something it doesn't need.
Regards
Ferdinand
A DAG created through this tool is not able to be deleted (due to the file path) even if I comment the part that is used by the DAG.
Do you have any plans to include that ?
Thanks!
Hi @ajbosco
Did you have chance to try dagfactory on airflow 2.0? I wonder if I will have problem if I upgrade to 2.0? If it is not compatible, are you planning to release a version for airflow 2.0?
I have method calldag(payload,dagname)... need to pass the following arguments payload and dagname. How can i pass them using yaml file
I want to set different dag dependency,How should I write in YML?
1.could we use DagFactory in AWS MWAA?.
2.could you please provide athena operator for quering?.. Please Reply.
When trying the example in this repo as-is on GCP's Cloud Composer with Airflow 1.10.6, it runs into this error when trying to parse the DAG:
verify config is correct. err:__init__() got an unexpected keyword argument 'tags'
When I downgrade dag-factory
to 0.4.2, it works as advertised.
I tried to upgrade airflow to 2.2.0 and got below error.
Broken DAG: [/opt/airflow/dags/****.py] Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 373, in get
return self._get_option_from_default_config(section, key, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/configuration.py", line 383, in _get_option_from_default_config
raise AirflowConfigException(f"section/key [{section}/{key}] not found in config")
airflow.exceptions.AirflowConfigException: section/key [core/dag_concurrency] not found in config
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 137, in clean_dags
dags: Dict[str, Any] = self.build_dags()
File "/home/airflow/.local/lib/python3.8/site-packages/dagfactory/dagfactory.py", line 102, in build_dags
raise Exception(
Exception: Failed to generate dag ****. verify config is correct
I guess it is because of this change.
Hi there 👋
When I write code I always use a linter, my IDE tells me I'm about to make a mess by not importing certain library or not using the right type.
This does not apply to config files, They're super nice because config "always compiles" 😉 . But the code running those config files not always like it. So to have a little bit more control I try to implement "schemas" on those config files. They're looser than "clases", also language independent, easier to maintain and to run.
It would be nice to have something like
import yaml
from jsonschema import validate
if __name__ == '__main__':
schema = """
type: object
properties:
default_args:
type: object
properties:
start_date:
type: string
schedule_interval:
type: string
catchup:
type: boolean
tasks:
type: array
items:
type: object
properties:
operator:
type: string
dependencies:
type: array
items:
type: string
"""
validate(
instance=yaml.safe_load(open('path/to/my_dag.yml')),
schema=yaml.safe_load(schema)
)
But this schema
is the one I generated, it doesn't distinguish between airflow versions, it's not up to date with the library. It would be nice to have a check like this. Pycharm picks up those schemas to validate things like .gitlab-ci.yaml
. So we could as well be writing DAGs the "safest" way with free linting.
Also a though. But this way the codebase could ease up a little bit on the yaml validation. Reducing the core
codebase makes it easier to read and maintain.
Regardless of what you think of this porposal, thanks for your work, and for open sourcing this library. It rocks 🚀 !
Does DAG Factory follow common workflow language or CWL in terms of it's yaml syntax?
I have a use case where I am accessing Airflow's variable using Variable.get(). How can I achieve this in yaml file.
Also, how can I access parameters (params.app_name, params.namespace etc) which I am passing via REST API as follows:
{ "conf": { "app_name":"app namet", "region":"", "namespace": "", "default_path": "", } }
env_variables = Variable.get("env_variables", deserialize_json=True)
fetch_yaml = BashOperator(
bash_command="export {{var.json.env_variables['kube_config'] && kubectl get sparkapplications {{ params.app_name}} -n {{params.namespace}}",
task_id="fetch_yaml"
)
Not sure why in the dag builder the config param parsing is a bit manual. Is it possible to get airflow DAG attribute and fill in if specified?
The specific use case is that I want to use the field access_control
in the DAG object.
Over here generating DAG & setting into Airflow's global is tied up together in the single method.
dag-factory/dagfactory/dagfactory.py
Lines 66 to 84 in bce607f
If I want to save the generated DAG into DB for versioning or serialise & send it over network, this is not possible with this method currently.
I suggest that, we return the generated DAG and we create another method set_dag() which will set the dag in Airflow's global namespace.
While using the dag-factory to create dags on Apache airflow, I noticed a very strange behavior. Suppose while creating a dag, input start_date is "2020-04-14T16:20:30.000Z". On airflow UI, it is shown as "2020-04-14T00:00:00.000Z" (Notice how the time component is removed).
I did a little bit of searching and I found out this issue to be due to this piece of code in utils.py
:
if isinstance(date_value, date):
return datetime.combine(date=date_value, time=datetime.min.time()).replace(
tzinfo=local_tz
)
if isinstance(date_value, datetime):
return date_value.replace(tzinfo=local_tz)
If I pass a datetime.datetime object as date_value, then it will match with the first if and that will remove the time component of my input.
Can you suggest a workaround if I am doing something wrong?
Most part of params that the DAG and the Operators and Sensors underneath take are basic/raw types (string, int, bool, float) and basic objects (lists, dicts). But from time to time we require some python object. For example:
An example for the ExternalTaskSensor
example_external_task_sensor:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
external_task_sensor:
operator: airflow.sensors.external_task.ExternalTaskSensor
external_dag_id: example_upstream_dependency_dag
external_task_id: example_upstream_dependency_task
execution_delta: !!python/object/apply:datetime.timedelta [0, 300]
And also an example for deserialising a reference to the callable straight from the yaml, lets suppose we are instantiating this yaml from a file main_dag_file.py
that also contains a python function we want to call (so that it belongs to the PYTHONPATH):
# main_dag_file.py
def add_two_numbers(first_operator: int, second_operator: int) -> int:
return first_operator + second_operator
example_call_python_code:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
example_python_task:
operator: airflow.operators.python.PythonOperator
python_callable: !!python/name:main_dag_file.add_two_numbers
op_wkargs:
first_operator: 3
second_operator: 2
In addition sometimes it's useful to both:
# main_dag_file.py
def decide_executor() -> str:
environment = os.getenv('EXECUTION_ENVIRONMENT')
if environment == 'prod':
executor = 'kubernetes.KubernetesPodOperator'
else:
executor = 'bash.BashOperator'
return 'airflow.operators.' + executor
# pass environment variables to the function
example_call_python_code:
default_args:
start_date: 2021-09-01
schedule_interval: @daily
tasks:
example_python_task:
operator: !!python/object/main_dag_file.decide_executor()
some_param: {{ var.value.my_variable }}
This can also be solved by being able to add python elements to the yaml file. This is a fine line. Config files should be language agnostic, I do know, that's why I open this Issue for the community to have their call too.
To implement this we should use yaml.load
instead of the yaml.load_safe
. As we are the owners of the code there's no risk on doing so. This feature (load_safe) is in place to prevent injection attacks.
Also, this feature will render unnecessary the current way of attaching the callable to a PythonOperator, and it complies with the exact operator params.
What are your thoughts?
Nameste @ajbosco Thank you for this great library.
I got a question.
I am wondering if the time taken by dag-factory to create a dag on airflow is configurable? I am seeing a time gap of about 3 minutes between I placing a yaml file of tasks (and the python configuration) and seeing the dag on Airflow UI.
Thanks again.
Does DAG Factory support TaskGroups?
If yes, can you please add documentation on how to specify Task Groups.
Hi Adam,
I try to use the pythonoperator with the dagfactory, I tried to define a function within the python file and call this function as python_callable and get always the following error message:
make sure config is properly populated. err:Failed to create <class 'airflow.operators.python_operator.PythonOperator'> task. err:
python_callableparam must be callable
As long as I write the task directly within the python file its no problem to call the function.
Am I doing something wrong or is it not possible to use the pythonoperator this way?
DAG file:
from airflow import DAG
import dagfactory
dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")
dag_factory.generate_dags(globals())
# [START howto_operator_python]
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
YAML config:
Python_Test:
default_args:
owner: 'user'
start_date: 2019-05-28
description: 'this is an sample dag which runs every day'
tasks:
test_1:
operator: airflow.operators.python_operator.PythonOperator
task_id: 'pythonoperator_Testtask'
python_callable: print_context
~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWa
rning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/__init__.py", line 2, in <module>
from .dagfactory import DagFactory
File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagfactory.py", line 8, in <module>
from dagfactory.dagbuilder import DagBuilder
File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/dagfactory/dagbuilder.py", line 10, in <module>
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
File "~/.pyenv/versions/3.9.6/lib/python3.9/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 25, in <module>
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator # noqa
Hey there,
Thank you by the library, it is awesome. I'm trying to use it though and didn't manage to. My case is the following:
I want to offer a way to users create their own dags using a small UI builder (no code solution). I already have that working, now I'm trying to configure the dag-factory to create those DAGs. Therefore, I can't add the script in the dags folder as described in the readme
. Instead I created the following dag:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from dagfactory import DagFactory
default_args = {
'owner': 'querylayer',
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
def create_dag(**kwargs):
dag_factory = DagFactory(config=kwargs['dag_run'].conf)
dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())
print(globals().get('dag_name'))
with DAG('create_workflow', default_args=default_args, schedule_interval=None, start_date=days_ago(0), catchup=False) as dag:
create_workflow = PythonOperator(
task_id='create_workflow',
provide_context=True,
python_callable=create_dag,
dag=dag
)
create_workflow
The idea is that we pass the config as JSON
to this DAG it will create the dag for you. So far so good, I tried running this DAG with the config below and it runs successfully:
{
"dag_name": {
"dag_id": "dag_name",
"schedule_interval": "None",
"description": "some description",
"doc_md": "# Title",
"catchup": false,
"concurrence": 2,
"max_active_runs": 2,
"task_groups": {},
"default_args": {
"start_date": "2021-09-28"
},
"tasks": {
"task_name": {
"operator": "airflow.operators.bash_operator.BashOperator",
"bash_command": "echo 1"
},
"task_name2": {
"operator": "airflow.operators.bash_operator.BashOperator",
"bash_command": "echo 2",
"dependencies": ["task_name"]
}
}
}
}
Below I put the output of the DAG execution, you can see that the line print(globals().get('dag_name'))
actually prints the dag created:
*** Reading local file: /opt/airflow/logs/create_workflow/create_workflow/2021-09-28T18:46:06.674663+00:00/3.log
[2021-09-28 18:51:49,399] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [queued]>
[2021-09-28 18:51:49,422] {taskinstance.py:1067} INFO -
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,422] {taskinstance.py:1068} INFO - Starting attempt 3 of 5
[2021-09-28 18:51:49,424] {taskinstance.py:1069} INFO -
--------------------------------------------------------------------------------
[2021-09-28 18:51:49,450] {taskinstance.py:1087} INFO - Executing <Task(PythonOperator): create_workflow> on 2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,460] {standard_task_runner.py:52} INFO - Started process 58 to run task
[2021-09-28 18:51:49,461] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'create_workflow', 'create_workflow', '2021-09-28T18:46:06.674663+00:00', '--job-id', '225', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/create_workflow.py', '--cfg-path', '/tmp/tmp1z29zp0e', '--error-file', '/tmp/tmp29nl9faf']
[2021-09-28 18:51:49,461] {standard_task_runner.py:77} INFO - Job 225: Subtask create_workflow
[2021-09-28 18:51:49,496] {logging_mixin.py:104} INFO - Running <TaskInstance: create_workflow.create_workflow 2021-09-28T18:46:06.674663+00:00 [running]> on host c1e986c90b97
[2021-09-28 18:51:49,538] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=querylayer
AIRFLOW_CTX_DAG_ID=create_workflow
AIRFLOW_CTX_TASK_ID=create_workflow
AIRFLOW_CTX_EXECUTION_DATE=2021-09-28T18:46:06.674663+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-28T18:46:06.674663+00:00
[2021-09-28 18:51:49,540] {logging_mixin.py:104} INFO - <DAG: dag_name>
[2021-09-28 18:51:49,540] {python.py:151} INFO - Done. Returned value was: None
[2021-09-28 18:51:49,558] {taskinstance.py:1191} INFO - Marking task as SUCCESS. dag_id=create_workflow, task_id=create_workflow, execution_date=20210928T184606, start_date=20210928T185149, end_date=20210928T185149
[2021-09-28 18:51:49,599] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-09-28 18:51:49,633] {local_task_job.py:151} INFO - Task exited with return code 0
But even tho everything runs successfully, the DAG isn't displayed in the dag list, we can see that using the command line to list all dags:
default@c1e986c90b97:/opt/airflow$ airflow dags list
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The hide_sensitive_variable_fields option in [admin] has been moved to the hide_sensitive_var_conn_fields option in [core] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
/home/airflow/.local/lib/python3.6/site-packages/airflow/configuration.py:346 DeprecationWarning: The default_queue option in [celery] has been moved to the default_queue option in [operators] - the old setting has been used, but please update your config.
dag_id | filepath | owner | paused
=========================+=============================+============+=======
create_workflow | create_workflow.py | querylayer | False
Am I missing something? Is this approach viable or there is a better way to do so?
Another question: are those DAGs stored in the DB somehow or it is created only in memory? I saw that you register that in the globals variable, but this means if somehow airflow restart it would not start with previously created dags, am I right?
The time sensor expects the target_time as a datetime.time.
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/time_sensor/index.html
How to add that in the yml?
Could we get rid off this long import?
airflow.operators.bash_operator.BashOperator
Would be greate to have
BashOperator
instead
Hi ,
first, amazing project,
hope it still running:)
I'm facing issue importing the dag-facotry package,
see below details
$ airflow version
2.1.2
$ pip list|grep dag
dag-factory 0.9.1
$ python -V
Python 3.6.13
$ python
Python 3.6.13 (default, May 12 2021, 16:48:24)
[GCC 8.3.0] on linux
>>> from airflow import DAG
>>> import dagfactory
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
>>>
Adam,
Great looking library, how do you deploy the YAML to airflow with the DAG factory?
I'm just diving into Airflow now, so any guidance is helpful. Airflow distributed the DAG itself, but every node will need to re-factory the DAG, so the YAML has to be available in the airflow image?
Dependabot couldn't authenticate with https://pypi.python.org/simple/.
You can provide authentication details in your Dependabot dashboard by clicking into the account menu (in the top right) and selecting 'Config variables'.
Hey!
I was testing some use cases to use it at our company, and was wondering if there is any support for SubDag operator?
I played around a bit and couldn't make it work.
Congrats on the great lib.
Cheers
Hi, first of all, nice job on this project.
I'm trying to configure a DAG pointing to scripts in a path different from the DAG's path so, I'm setting the template_searchpath
in YAML config to point to this location but I'm receiving the error: jinja2.exceptions.TemplateNotFound: /opt/airflow/scripts/query.sql
so I guess the template_searchpath
is not working.
If I place the query.sql
in the DAG's path, it works.
Here is my YAML
default:
default_args:
owner: "dag-factory"
start_date: 2 days
retries: 1
retry_delay_sec: 300
concurrency: 1
max_active_runs: 1
dagrun_timeout_sec: 600
default_view: "graph"
orientation: "LR"
schedule_interval: "0 1 * * *"
catchup: False
tags: ['dag-factory']
template_searchpath: ["/opt/airflow/scripts"]
example_sql_exec_query:
description: "DAG to run SQL query"
tasks:
begin:
operator: airflow.operators.dummy_operator.DummyOperator
exec_sql:
operator: airflow.operators.postgres_operator.PostgresOperator
postgres_conn_id: "postgres"
sql: /opt/airflow/scripts/query.sql # it doesn't work.
params: {"columns": "dag_id, owners", "table": "public.dag", "limit": 10}
dependencies: [begin]
Does anybody with the same issue?
relevant packages:
apache-airflow[crypto]==1.10.2
dag-factory==0.4.0
dag-factory yaml:
registry_of_metrics:
default_args:
provide_context: True
depends_on_past: True
retries: 15
start_date: 2018-10-01
schedule_interval: '0 9 1 * *'
description: 'registry of metric dag'
tasks:
# Paying dealers metric
staging_paying_dealers:
operator: operators.DockerRunOperator
entrypoint: 'bin/generate_report.py'
task_id: 'paying_dealers'
arguments:
date_name: 'as_of_date'
sql_class: lib.models.reports.staging_paying_dealers.StagingPayingDealersReport
schema: 'SCORECARD_STAGING'
table: 'PAYING_DEALERS'
schedule: 'monthly'
template_arguments:
date: 'next_execution_date'
paying_dealers:
operator: operators.DockerRunOperator
entrypoint: 'bin/generate_report.py'
task_id: 'paying_dealers'
dependencies: [staging_paying_dealers]
arguments:
date_name: 'as_of_date'
sql_class: lib.models.reports.paying_dealers.PayingDealersReport
schema: 'SCORECARD'
table: 'METRIC_MONTH_COUNTRY_DATA'
schedule: 'monthly'
metric_id: 3
template_arguments:
date: 'next_execution_date'
# Session uniques metric
session_uniques:
operator: operators.DockerRunOperator
entrypoint: 'bin/generate_report.py'
task_id: 'session_uniques'
arguments:
date_name: 'as_of_date'
sql_class: lib.models.reports.session_uniques.SessionUniquesReport
schema: 'SCORECARD'
table: 'METRIC_MONTH_COUNTRY_DATA'
schedule: 'monthly'
metric_id: 9
template_arguments:
date: 'next_execution_date'
Traceback (most recent call last):
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
return func(*args, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/views.py", line 2224, in index
auto_complete_data=auto_complete_data)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/base.py", line 308, in render
return render_template(template, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 134, in render_template
context, ctx.app)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask/templating.py", line 116, in _render
rv = template.render(context)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/asyncsupport.py", line 76, in render
return original_render(self, *args, **kwargs)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 1008, in render
return self.environment.handle_exception(exc_info, True)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/environment.py", line 780, in handle_exception
reraise(exc_type, exc_value, tb)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/jinja2/_compat.py", line 37, in reraise
raise value.with_traceback(tb)
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 18, in top-level template code
{% extends "airflow/master.html" %}
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/master.html", line 18, in top-level template code
{% extends "admin/master.html" %}
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 18, in top-level template code
{% extends 'admin/base.html' %}
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 37, in top-level template code
{% block page_body %}
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/admin/master.html", line 107, in block "page_body"
{% block body %}
File "/home/seth/code/windflood/airflow/venv/lib/python3.7/site-packages/airflow/www/templates/airflow/dags.html", line 88, in block "body"
<a href="{{ url_for('airflow.'+dag.default_view, dag_id=dag.dag_id) }}" title="{{ dag.description }}">
TypeError: can only concatenate str (not "NoneType") to str
Hi @ajbosco,
We have defined some Python functions that return TaskGroup. We call these functions to create DAGs.
In Python, a function like make_task_group_X could be used like this:
with DAG(...) as dag:
g1 = make_task_group_a(args...)
g2 = make_task_group_b(args...)
g3 = make_task_group_c(args...)
start >> g1 >> g2 >> g3 >> end
With dag-factory, is it possible to call user defined functions to build up a DAG?
Dependabot can't resolve your Python dependency files.
As a result, Dependabot couldn't update your dependencies.
The error Dependabot encountered was:
ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.
[pipenv.exceptions.ResolutionFailure]: req_dir=requirements_dir
[pipenv.exceptions.ResolutionFailure]: File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 726, in resolve_deps
[pipenv.exceptions.ResolutionFailure]: req_dir=req_dir,
[pipenv.exceptions.ResolutionFailure]: File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 480, in actually_resolve_deps
[pipenv.exceptions.ResolutionFailure]: resolved_tree = resolver.resolve()
[pipenv.exceptions.ResolutionFailure]: File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 395, in resolve
[pipenv.exceptions.ResolutionFailure]: raise ResolutionFailure(message=str(e))
[pipenv.exceptions.ResolutionFailure]: pipenv.exceptions.ResolutionFailure: ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
[pipenv.exceptions.ResolutionFailure]: Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
[pipenv.exceptions.ResolutionFailure]: Warning: Your dependencies could not be resolved. You likely have a mismatch in your sub-dependencies.
First try clearing your dependency cache with $ pipenv lock --clear, then try the original command again.
Alternatively, you can use $ pipenv install --skip-lock to bypass this mechanism, then run $ pipenv graph to inspect the situation.
Hint: try $ pipenv lock --pre if it is a pre-release dependency.
ERROR: ERROR: Could not find a version that matches werkzeug<0.15.0,>=0.12,>=0.14.1,>=0.15
Tried: 0.1, 0.2, 0.3, 0.3.1, 0.4, 0.4.1, 0.5, 0.5.1, 0.6, 0.6.1, 0.6.2, 0.7, 0.7.1, 0.7.2, 0.8, 0.8.1, 0.8.2, 0.8.3, 0.9, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.10, 0.10.1, 0.10.2, 0.10.2, 0.10.4, 0.10.4, 0.11, 0.11, 0.11.1, 0.11.1, 0.11.2, 0.11.2, 0.11.3, 0.11.3, 0.11.4, 0.11.4, 0.11.5, 0.11.5, 0.11.6, 0.11.6, 0.11.7, 0.11.7, 0.11.8, 0.11.8, 0.11.9, 0.11.9, 0.11.10, 0.11.10, 0.11.11, 0.11.11, 0.11.12, 0.11.12, 0.11.13, 0.11.13, 0.11.14, 0.11.14, 0.11.15, 0.11.15, 0.12, 0.12, 0.12.1, 0.12.1, 0.12.2, 0.12.2, 0.13, 0.13, 0.14, 0.14, 0.14.1, 0.14.1, 0.15.0, 0.15.0, 0.15.1, 0.15.1, 0.15.2, 0.15.2, 0.15.3, 0.15.3, 0.15.4, 0.15.4
There are incompatible versions in the resolved dependencies.
['Traceback (most recent call last):\n', ' File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 501, in create_spinner\n yield sp\n', ' File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 649, in venv_resolve_deps\n c = resolve(cmd, sp)\n', ' File "/usr/local/.pyenv/versions/3.6.8/lib/python3.6/site-packages/pipenv/utils.py", line 539, in resolve\n sys.exit(c.return_code)\n', 'SystemExit: 1\n']
If you think the above is an error on Dependabot's side please don't hesitate to get in touch - we'll do whatever we can to fix it.
You can mention @dependabot in the comments below to contact the Dependabot team.
Hi,
Hope Sparksubmitoperator, hdfs sensor operator and many operators are not included in the dag-factory.
Could you please add that as well.
Also would be great If I get sample yaml file for sparksubmitoperator.
Hi,
thanks a lot for this repo.
I'm on macOS Catalina, Version 10.15.7
Docker version 20.10.6, build 370c289
while at the root of the repo:
make docker-run
=> http://localhost:8080/ says This site can’t be reached
while debugging, airflow db init
outputs
any guidance is appreciated!
Hey there,
this is an incredible extension to airflow. I'm very excited about it, but I'm question my self how to deploy dags properly (I'm far new to airflow). In my tests a change of the yaml file and push to airflow changes a running jobs instantly (and executes the new tasks, which I expect to start in the next run).
Could you share a best practices how to deploy the yaml files and maybe how to version the dags?
Thanks in advance.
Just wondering if this is library is compatible with Airflow 2.0 ?
I followed the instructions in the README file to install dag-factory in a local airflow container, in which the airflow version is 1.10.12 and python version is 3.8.9.
I copied the files example_dag_factory.yml, example_dag_factory.py and print_hello.py in https://github.com/ajbosco/dag-factory/blob/master/examples into the folder /usr/local/airflow/dags in the container, expecting the DAGs defined in the yml file can be recognized. However, I see the following error in the airflow UI.
Not sure what I missed. Any helps are appreciated.
Does DAG Factory support Airflow variables?
If yes, can you please guide or add documentation.
Also, where can I get the full list of default_args
allowed ?
My. dag look like this :
my_dag:
schedule_interval: '@once'
on_failure_callback_name: failure_callback
on_failure_callback_file: /usr/local/airflow/dags/helper.py
on_success_callback_name: success_callback
on_success_callback_file: /usr/local/airflow/dags/helper.py
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.