Code Monkey home page Code Monkey logo

airflow-maintenance-dags's People

Contributors

dud225 avatar eepstein avatar flolas avatar grahovam avatar jkornblum avatar lx950627 avatar mek97 avatar mg-aik avatar nikhilmn avatar noelmcloughlin avatar prakshalj0512 avatar rssanders3 avatar sudhakar-nallam avatar vadipo avatar wolli2710 avatar xyu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

airflow-maintenance-dags's Issues

shouldn't log-cleanup task run in sequence?

Hi, thanks for this great project!

I've just installed log-cleanup and set airflow_log_cleanup__enable_delete_child_log to True.

After a DAG run I can see two log_cleanup_worker's have run successfully.
image

But the task log tells me that one of the task was skipped. On this particular case the logs/scheduler/.

[2020-08-04 12:57:47,017] {bash_operator.py:126} INFO - Another task is already deleting logs on this worker node.     Skipping it!
[2020-08-04 12:57:47,017] {bash_operator.py:126} INFO - If you believe you're receiving this message in error, kindly check     if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
[2020-08-04 12:57:47,017] {bash_operator.py:130} INFO - Command exited with return code 0

I'm new to Airflow, but I think log_cleanup_worker was intended to run in sequence.
Am I seeing this wrong?

Query error in airflow db cleanup (DagRunModel)

Error in cleanup_DagRun Task of DAG airflow-db-cleanup
Airflow Version: 1.10.9 (Kubernetes Executor)
`LINE 6: WHERE dag_run.external_trigger = 0 GROUP BY dag_run.dag_id) ...
^
HINT: No operator matches the given name and argument type(s). You might need to add explicit type casts.

[SQL: SELECT dag_run.id AS dag_run_id, dag_run.execution_date AS dag_run_execution_date
FROM dag_run
WHERE dag_run.execution_date NOT IN (SELECT anon_1.max_1 AS anon_1_max_1
FROM (SELECT max(dag_run.execution_date) AS max_1
FROM dag_run
WHERE dag_run.external_trigger = %(external_trigger_1)s GROUP BY dag_run.dag_id) AS anon_1) AND dag_run.execution_date <= %(execution_date_1)s]`

Configuration variable empty

I followed the readme and create variable and adjust some in the code.

At the end I got :

[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - Configurations:
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - BASE_LOG_FOLDER:      ''
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - MAX_LOG_AGE_IN_DAYS:  ''
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - ENABLE_DELETE:        ''

Where I'm expected :

[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - Configurations:
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - BASE_LOG_FOLDER:      ‘/data/airflow/logs'
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - MAX_LOG_AGE_IN_DAYS:  ‘30'
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - ENABLE_DELETE:        ’true'

Also the script doesn't run since I got the "error" :

INFO - Another task is already deleting logs on this worker node.     Skipping it!
INFO - If you believe you're receiving this message in error, kindly check     if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
INFO - Command exited with return code 0
INFO - Marking task as SUCCESS.dag_id=clean_airflow_logs, task_id=log_cleanup_worker_num_1_dir_0, execution_date=20200817T130009, start_date=20200817T130027, end_date=20200817T130027

Also in this case the script should exit 1 since there was an issue. Here my dag and task say that everything run fine.

ERROR - pickle data was truncated on TaskInstance: airflow-db-cleanup.cleanup_BaseXCom

When Running %s on host %s <TaskInstance: airflow-db-cleanup.cleanup_BaseXCom 2020-09-09T00:00:00+00:00
It runs for 1 minute and then fails with :
[2020-09-10 00:04:11,000] {taskinstance.py:1150} ERROR - pickle data was truncated
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 984, in run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/maintenance/airflow-db-cleanup.py", line 289, in cleanup_function
entries_to_delete = query.all()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3319, in all
return list(self)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, in instances
cursor.close()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 69, in exit
exc_value, with_traceback=exc_tb,
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise

raise exception
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 602, in _instance
state.manager.dispatch.load(state, context)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 322, in call
fn(*args, **kw)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 3378, in _event_on_load
instrumenting_mapper._reconstructor(state.obj())
File "/usr/local/lib/python3.6/site-packages/airflow/models/xcom.py", line 72, in init_on_load
self.value = pickle.loads(self.value)
_pickle.UnpicklingError: pickle data was truncated
[2020-09-10 00:04:11,005] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_BaseXCom, execution_date=20200909T000000, start_date=20200910T000320, end_date=20200910T000411
[2020-09-10 00:04:15,716] {local_task_job.py:102} INFO - Task exited with return code 1

ERROR - naive datetime is disallowed

After running DAG, i faced the below error to all set_downsteam instances. Altough, print_configuration was unaffected, so it ran successfully. After tracking the error in python libs, I found it is being caught here. https://github.com/apache/incubator-airflow/blob/4083a8f5217e9ca7a5c83a3eaaaf403dd367a90c/airflow/utils/sqlalchemy.py#L153

It looks like non-naive timestamp is not allowed beyond AF(1.10), not sure about previous version.

Environment :

  • AF - v1.10.0

[2018-11-28 17:54:15,124] {models.py:1736} ERROR - (builtins.ValueError) naive datetime is disallowed [SQL: 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config \nFROM task_instance \nWHERE task_instance.execution_date <= %s'] [parameters: [{}]] Traceback (most recent call last): File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context context = constructor(dialect, self, conn, *args) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/engine/default.py", line 623, in _init_compiled param.append(processors[key](compiled_params[key])) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/type_api.py", line 1078, in process return process_param(value, dialect) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/airflow/utils/sqlalchemy.py", line 156, in process_bind_param raise ValueError('naive datetime is disallowed') ValueError: naive datetime is disallowed

Airflow runs out of memory on large MySQL databases

When running cleanup scripts for the first time on large DBs, the script will consume tens or hundreds of gigs of memory. This is because of this:

entries_to_delete = query.all()

Python's MySQL connection buffers everything in-memory by default. On large DBs, this can be hundreds of gigs. SSCursor should be used for MySQL:

https://pymysql.readthedocs.io/en/latest/modules/cursors.html#pymysql.cursors.SSCursor

However, setting the SSCursor at the Airflow connection level is pretty invasive, as it affects all of Airflow. It would be nice to be able to set this configuration for just the script, itself.

Had to cast execution_date as Text to get the LIKE clause working

In order to get kill-halted-tasks to work, I had to cast execution_date as text.

ex: CAST(DagRun.execution_date as TEXT)

Otherwise, I get the following error:

sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) operator does not exist: timestamp without time zone ~~ unknown
Subtask: LINE 3: ...n.dag_id = 'example-dag' AND dag_run.execution_date LIKE '2018...

Happy to submit a PR. Just wanted to give a heads-up first.

.lock file created but no permission

I'm trying to run your DAG for maintenance and the first time I tried it i got in the log

Another task is already deleting logs on this worker node.     Skipping it!
If you believe you're receiving this message in error, kindly check     if /tmp/airflow_log_cleanup_worker.lock exists and delete it.

I RM the file then retry it and got :

Lock file not found on this node!     Creating it to prevent collisions...
Error creating the lock file.         Check if the airflow user can create files under tmp directory.         Exiting...

The 3rd I got once again the issue with the .lock file

I have given the right to my user airflow to the /tmp folder with sudo setfacl -m u:airflow:rwx /tmp. How can my user create the ,lock file but then not having the right to do it ?

Cannot import settings

So, I believe this is only in the newest version of Airflow

from airflow.models import settings for the cleanup DAG

Throws this ImportError: cannot import name 'settings' from 'airflow.models' (/usr/local/lib/python3.7/site-packages/airflow/models/__init__.py)

So, apparently, there are not settings? Not sure yet if this is truly an issue with this cleanup dag or is something else with the configuration

Kill Halted Dags - current transaction is aborted

Awesome DAGs - these are very useful. However, I cannot get the 'airflow-kill-halted-tasks' DAG to run. I always get the below error.

Airflow==1.9.0
Celery==4.1.0
psycopg2==2.7.4

sql_alchemy_conn = postgresql+psycopg2://airflow:...

[2018-08-01 13:01:28,394] {base_task_runner.py:98} INFO - Subtask:     cursor.execute(statement, parameters)
[2018-08-01 13:01:28,396] {base_task_runner.py:98} INFO - Subtask: sqlalchemy.exc.InternalError: (psycopg2.InternalError) current transaction is aborted, commands ignored until end of transaction block
[2018-08-01 13:01:28,400] {base_task_runner.py:98} INFO - Subtask:  [SQL: 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid \nFROM task_instance \nWHERE task_instance.task_id = %(param_1)s AND task_instance.dag_id = %(param_2)s AND task_instance.execution_date = %(param_3)s'] [parameters: {'param_1': 'kill_halted_tasks', 'param_3': datetime.datetime(2018, 8, 1, 12, 0), 'param_2': 'airflow-kill-halted-tasks'}] (Background on this error at: http://sqlalche.me/e/2j85)

celery_taskmeta and celery_tasksetmeta tables

Does the current cleanup deal with celery_taskmeta and celery_tasksetmeta tables?
If not, how can I add it to the DATABASE_OBJECTS array? there isn't a corresponding airflow.models for it
Thanks!

missing class BaseJob in Airflow 2

Hi

I am using Airflow 2, when use airflow-db-cleanup.py it shows an error when combining code:
missing BaseJob in airflow.jobs class
===> from airflow.jobs import BaseJob

I removed that, and it run well. But nothing happens, Dagrun, DagInstance ... still the same. On Airflow UI still show alot row.

Will log cleanup dag work with kubernetes?

I saw there some configs such as # of workers specified. If you run the KubernetesExecutor in airflow on k8's, there isn't really any set # of workers. Instead each task get's a new pod created. So are these runnable on K8's or must they be modified?

Log cleanup missing dag_processor_manager.log

The log cleanup DAG works great, but it misses the dag_processor_manager.log file. I'm not sure if this file is new in Airflow or not. The configuration for it was added in 1.10.2, but the file might have existed earlier. Unlike the other log files, this one is just a standard, sequential log. It lives in the dag_processor_manager directory, but is not nested within any subdirectories named after the date it was run, like the scheduler and individual DAGs are. It's possible to use logrotate to manage this log file, but it would be nice if everything were together on one place.

clean-db broken unless you install celerity

There are many different airflow deployments.
The strongest trend now goes with kubernetes executor. There are simpler deployments for small environments too where LocalExecutor feels convenient.
Both deployments can not take advantaje of your, previously magnificent, db cleanup script, because it requires the celerity module.

directory won't be cleanup

log_cleanup_file_op.set_downstream(log_cleanup_dir_op)
you cleanup file first, which changes the directory modify time to now, and the
find /home/xiaoju/airflow/logs/*/* -type d -empty -mtime +30 will return nothing

cleanup-logs.py - 'dict object' has no attribute 'maxLogAgeInDays'

When running with a scheduler this part of the code results in failed tasks for me with the following error:

MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
    echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
    MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi

Error message:
'dict object' has no attribute 'maxLogAgeInDays'
It seems that the dag does not find the entry maxLogAgeInDays.

I wanted to ask where this value should be set? Following the readme, I only created two variables called airflow_log_cleanup__max_log_age_in_days and airflow_log_cleanup__enable_delete_child_log which are getting called for setting the variables DEFAULT_MAX_LOG_AGE_IN_DAYS and ENABLE_DELETE_CHILD_LOG.

I managed to get it to work by substituting the whole code above by:

MAX_LOG_AGE_IN_DAYS=""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """

Since the DEFAULT_MAX_LOG_AGE_IN_DAYS already reads out the variable set in the UI, I do not need dag_run.conf.maxLogAgeInDays, right?

In general, I would suggest to rename DEFAULT_MAX_LOG_AGE_IN_DAYS to MAX_LOG_AGE_IN_DAYS, because the variable is not the default, but the 30 in the .get() is:

DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
    "airflow_log_cleanup__max_log_age_in_days", 30
)

One more thing:
Following best practices it would be better to combine the two variables to a single Variable in the JSON format to reduce DB requests:

airflow_log_cleanup = Variable.get("airflow_log_cleanup", deserialize_json=True)
DEFAULT_MAX_LOG_AGE_IN_DAYS = airflow_log_cleanup.get("max_log_age_in_days", 30)
ENABLE_DELETE_CHILD_LOG = airflow_log_cleanup.get("enable_delete_child_log", "False")

I am using AirFlow 2.0.1

Usage?

Hey @rssanders3 - I'm running a large number of Airflow instances in a cluster environment and would love to get more info around how you're using these scripts.

For db-cleanup, do you really delete rows out of task_instance, dag_run, etc in a prod database? I've always thought of that as a source of truth to keep forever. That aside, my Airflow DBs are growing. For example, ~5M rows in task_instance. I haven't really taken the time to profile Airflow's queries to determine if/when this becomes significant.

For log-cleanup, are you running something similar on scheduler logs or the logs from the DAG file processor? I've definitely seen those get big. I've been wondering about the best way to deal with this inside/outside Airflow. Wondering if configuring log rotation is the best solution.

Happy to chat over email / gitter / slack / etc if there's a better channel for discussion.

airflow-db-cleanup should be retrieving only the primary key column to avoid out of memory

When running the airflow db cleanup dag, it loads the full rows in memory as per

entries_to_delete = session.query(airflow_db_model).filter(
age_check_column <= max_date,
).all()

Translating into this (postgres) query, e.g.

SELECT xcom.id AS xcom_id, xcom.key AS xcom_key, xcom.value AS xcom_value, xcom.timestamp AS xcom_timestamp, xcom.execution_date AS xcom_execution_date, xcom.task_id AS xcom_task_id, xcom.dag_id AS xcom_dag_id 
	FROM xcom 
	WHERE xcom.execution_date <= '2018-11-24T16:48:30.861484+00:00'::timestamptz

For xcom tables with large values being passed around, this quickly leads to out of memory.

As the purpose of the session.query is to only read rows to be deleted, it would be better to only load enough information to subsequently delete the rows (and any other essential info, e.g. timestamp).

I'm not a python / sqlalchemy expert, but I can see that something like load_only is available for this

MySQL error when deleting from dag_run table

Hi, I'm getting the following error when running this cleanup dag in my dev-airflow environment.

[2019-04-17 18:23:43,965] {models.py:1788} ERROR - (_mysql_exceptions.OperationalError) (1093, "You can't specify target table 'dag_run' for update in FROM clause") [SQL: 'DELETE FROM dag_run WHERE dag_run.execution_date NOT IN (SELECT max(dag_run.execution_date) AS max_1 \nFROM dag_run GROUP BY dag_run.dag_id) AND dag_run.execution_date <= %s'] [parameters: (datetime.datetime(2019, 3, 18, 18, 17, 57, 359710, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),)]

My understanding is that MySQL won't let you delete from the dag_run table if dag_run is used in the subquery.

Have you encountered this issue? How did you get around it?

Thanks

cleanup_DagRun crash

The SQL query throws an exception when trying to delete dag runs:

[2019-09-13 11:58:27,276] {logging_mixin.py:95} INFO - [2019-09-13 11:58:27,276] {airflow-db-cleanup.py:134} INFO - Process will be Deleting 143 DagRun(s)
[2019-09-13 11:58:27,276] {logging_mixin.py:95} INFO - [2019-09-13 11:58:27,276] {airflow-db-cleanup.py:137} INFO - Performing Delete...
[2019-09-13 11:58:27,277] {taskinstance.py:1047} ERROR - (_mysql_exceptions.OperationalError) (1093, "You can't specify target table 'dag_run' for update in FROM clause")
[SQL: DELETE FROM dag_run WHERE dag_run.execution_date NOT IN (SELECT max(dag_run.execution_date) AS max_1
FROM dag_run GROUP BY dag_run.dag_id) AND dag_run.execution_date <= %s]
[parameters: (datetime.datetime(2019, 8, 14, 8, 56, 48, 619622, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),)]
(Background on this error at: http://sqlalche.me/e/e3q8)
Traceback (most recent call last):
File "/home/airflow/venv37/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/venv37/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1093, "You can't specify target table 'dag_run' for update in FROM clause")

Airflow 1.10.4

Bulk deletes

Delete operation is happening inside a for loop for every object that we get from the select query here.

SELECT log.id AS log_id, log.dttm AS log_dttm FROM log WHERE log.dttm <= %s

I have no idea on how the sqlAlchemy works, but from the I can infer that we are trying to delete item by item followed by a commit call.

for entry in entries_to_delete:
        session.delete(entry)
        session.commit()

But why dont we do a bulk delete?

--- A sql version will look like this.
DELETE from log where log.dttm < "max-time-to-retain-we-get-from-select-query"

If someone can help me understand the feasibility of this method, I would like to do a PR for this.

Why can't we do a query.delete() ?

Not deleting airflow logs for master node

This util dag is not deleting airflow logs for the master node but doing a delete for all worker nodes, please guide to do delete for the master node also where It is Airflow Cluster with Celery Executor. Thanks

db-cleanup also removing dag_run logs (log table)

It is better to have some logic to exclude failed tasks.
I check the way that you deleting data, and you just relied on execution date, I know I can add success as keep_filter in dag_run, but also removing logs should only remove logs for deleted dag_run, otherwise, we face with an issue in UI

Import error while execution of script

Hi,

iam getting bvelow import error / syntax error while executing the script in airflow container

[2019-10-31 09:47:52,516] {models.py:377} ERROR - Failed to import: /usr/local/airflow/dags/airflow_clear_missing_dags.py
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "", line 684, in _load
File "", line 665, in _load_unlocked
File "", line 674, in exec_module
File "", line 781, in get_code
File "", line 741, in source_to_code
File "", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/airflow_clear_missing_dags.py", line 52
except Exception, e:
^
SyntaxError: invalid syntax

Could you please help me,if you have a solution / fix for this?

thanks

Invalid syntax error

Hi teamclairvoyant,

I have tried using your maintenance DAGs and have spotted a syntax error. I've created a PR #53 with a fix.

Thanks

Keep at least one DAG Run for active DAG

We configure the maintenance DAG to keep 90-day history which works well. Recently we added a new DAG which has to backfill the data from the beginning of the year. However, when maintenance DAG runs, it just removes all DAG Runs older than 90 days, including the new DAG. This caused the new DAG to rerun from the beginning again. Seems the maintenance DAG should keep an least one DAG Run if a DAG is still active.

Multiple Worker Machine Log Clearnup

I believe the task in a DAG is send to worker randomly by scheduler. How do you make sure that all worker machine get logs cleaned?

In the code I just see a loop of NUMBER_OF_WORKERS, but this don't guarantee that each worker will run the task and get log cleaned up.

airflow-db-cleanup - RenderedTaskInstanceFields could not be found

Airflow had trouble parsing this airflow-db-cleanup dag. RenderedTaskInstanceFields could not be found.

I don't know what it is and so I just removed it in the from airflow.models import and also from DATABASE_OBJECTS.

Airflow v1.10.4 using the sequential operator

'max_db_entry_age_in_days' from Variable causes a TypeError: bad operand type for unary -: 'str'

When the max_db_entry_age_in_days is retrieved from Variable.get

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = Variable.get("max_db_entry_age_in_days", 30) # Length to retain the log files if not already provided in the conf. If this is set to 30, the job will remove those files that are 30 days old or older.
is then returned as a string.

This leads to the following error:

  File "/usr/local/airflow/dags/maintenance/airflow-db-cleanup.py", line 66, in print_configuration_function
    max_date = datetime.now(tz=tzinfo) + timedelta(-max_db_entry_age_in_days)
TypeError: bad operand type for unary -: 'str'

Wrapping it with an int() seems to solve the problem:

DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(Variable.get("max_db_entry_age_in_days", 30))

add session.commit() after delete for airflow 1.9

Hi,
Nice job, i appreciate these dags !!

With airflow 1.9, the session alchemy is instanced with autocommit=false
In settings.py :
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))

I add a session.commit() after the loop session.delete(entry) in the script airflow-db-cleanup.py, and it's work !!!

Check file hashes

Dag file which change the name of the dag but do not use a new filename are not cleaned up.

This should be possible by checking against fileloc_hash stored in dag_code and dag_hash in serialized_dag (when dag serialization is enabled)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.