teamclairvoyant / airflow-maintenance-dags Goto Github PK
View Code? Open in Web Editor NEWA series of DAGs/Workflows to help maintain the operation of Airflow
License: Apache License 2.0
A series of DAGs/Workflows to help maintain the operation of Airflow
License: Apache License 2.0
If BASE_LOG_FOLDER
is the parent dir of CHILD_PROCESS_LOG_DIRECTORY
, https://github.com/teamclairvoyant/airflow-maintenance-dags/blob/master/log-cleanup/airflow-log-cleanup.py#L76-L84 can fail because both tasks will try to delete the same file.
Hi, thanks for this great project!
I've just installed log-cleanup
and set airflow_log_cleanup__enable_delete_child_log
to True
.
After a DAG run I can see two log_cleanup_worker's have run successfully.
But the task log tells me that one of the task was skipped. On this particular case the logs/scheduler/
.
[2020-08-04 12:57:47,017] {bash_operator.py:126} INFO - Another task is already deleting logs on this worker node. Skipping it!
[2020-08-04 12:57:47,017] {bash_operator.py:126} INFO - If you believe you're receiving this message in error, kindly check if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
[2020-08-04 12:57:47,017] {bash_operator.py:130} INFO - Command exited with return code 0
I'm new to Airflow, but I think log_cleanup_worker was intended to run in sequence.
Am I seeing this wrong?
Error in cleanup_DagRun Task of DAG airflow-db-cleanup
Airflow Version: 1.10.9 (Kubernetes Executor)
`LINE 6: WHERE dag_run.external_trigger = 0 GROUP BY dag_run.dag_id) ...
^
HINT: No operator matches the given name and argument type(s). You might need to add explicit type casts.
[SQL: SELECT dag_run.id AS dag_run_id, dag_run.execution_date AS dag_run_execution_date
FROM dag_run
WHERE dag_run.execution_date NOT IN (SELECT anon_1.max_1 AS anon_1_max_1
FROM (SELECT max(dag_run.execution_date) AS max_1
FROM dag_run
WHERE dag_run.external_trigger = %(external_trigger_1)s GROUP BY dag_run.dag_id) AS anon_1) AND dag_run.execution_date <= %(execution_date_1)s]`
I followed the readme and create variable and adjust some in the code.
At the end I got :
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - Configurations:
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - BASE_LOG_FOLDER: ''
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - MAX_LOG_AGE_IN_DAYS: ''
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - ENABLE_DELETE: ''
Where I'm expected :
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - Configurations:
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - BASE_LOG_FOLDER: ‘/data/airflow/logs'
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - MAX_LOG_AGE_IN_DAYS: ‘30'
[2020-08-17 15:56:56,257] {bash_operator.py:126} INFO - ENABLE_DELETE: ’true'
Also the script doesn't run since I got the "error" :
INFO - Another task is already deleting logs on this worker node. Skipping it!
INFO - If you believe you're receiving this message in error, kindly check if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
INFO - Command exited with return code 0
INFO - Marking task as SUCCESS.dag_id=clean_airflow_logs, task_id=log_cleanup_worker_num_1_dir_0, execution_date=20200817T130009, start_date=20200817T130027, end_date=20200817T130027
Also in this case the script should exit 1
since there was an issue. Here my dag and task say that everything run fine.
When Running %s on host %s <TaskInstance: airflow-db-cleanup.cleanup_BaseXCom 2020-09-09T00:00:00+00:00
It runs for 1 minute and then fails with :
[2020-09-10 00:04:11,000] {taskinstance.py:1150} ERROR - pickle data was truncated
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 984, in run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/maintenance/airflow-db-cleanup.py", line 289, in cleanup_function
entries_to_delete = query.all()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3319, in all
return list(self)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 101, in instances
cursor.close()
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 69, in exit
exc_value, with_traceback=exc_tb,
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 178, in raise
raise exception
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in instances
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 81, in
rows = [proc(row) for row in fetch]
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/loading.py", line 602, in _instance
state.manager.dispatch.load(state, context)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/event/attr.py", line 322, in call
fn(*args, **kw)
File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/mapper.py", line 3378, in _event_on_load
instrumenting_mapper._reconstructor(state.obj())
File "/usr/local/lib/python3.6/site-packages/airflow/models/xcom.py", line 72, in init_on_load
self.value = pickle.loads(self.value)
_pickle.UnpicklingError: pickle data was truncated
[2020-09-10 00:04:11,005] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_BaseXCom, execution_date=20200909T000000, start_date=20200910T000320, end_date=20200910T000411
[2020-09-10 00:04:15,716] {local_task_job.py:102} INFO - Task exited with return code 1
A couple of the dags are currently missing the creation of the now variable and this causes them to fail.
After running DAG, i faced the below error to all set_downsteam instances. Altough, print_configuration
was unaffected, so it ran successfully. After tracking the error in python libs, I found it is being caught here. https://github.com/apache/incubator-airflow/blob/4083a8f5217e9ca7a5c83a3eaaaf403dd367a90c/airflow/utils/sqlalchemy.py#L153
It looks like non-naive timestamp is not allowed beyond AF(1.10), not sure about previous version.
Environment :
[2018-11-28 17:54:15,124] {models.py:1736} ERROR - (builtins.ValueError) naive datetime is disallowed [SQL: 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config \nFROM task_instance \nWHERE task_instance.execution_date <= %s'] [parameters: [{}]] Traceback (most recent call last): File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/engine/base.py", line 1116, in _execute_context context = constructor(dialect, self, conn, *args) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/engine/default.py", line 623, in _init_compiled param.append(processors[key](compiled_params[key])) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/type_api.py", line 1078, in process return process_param(value, dialect) File "/home/daemonsl/airflow/venv/lib/python3.5/site-packages/airflow/utils/sqlalchemy.py", line 156, in process_bind_param raise ValueError('naive datetime is disallowed') ValueError: naive datetime is disallowed
Hey
Any chance updating these dags for 2.0 compatibility?
When running cleanup scripts for the first time on large DBs, the script will consume tens or hundreds of gigs of memory. This is because of this:
entries_to_delete = query.all()
Python's MySQL connection buffers everything in-memory by default. On large DBs, this can be hundreds of gigs. SSCursor should be used for MySQL:
https://pymysql.readthedocs.io/en/latest/modules/cursors.html#pymysql.cursors.SSCursor
However, setting the SSCursor at the Airflow connection level is pretty invasive, as it affects all of Airflow. It would be nice to be able to set this configuration for just the script, itself.
In order to get kill-halted-tasks
to work, I had to cast execution_date
as text.
ex: CAST(DagRun.execution_date as TEXT)
Otherwise, I get the following error:
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) operator does not exist: timestamp without time zone ~~ unknown
Subtask: LINE 3: ...n.dag_id = 'example-dag' AND dag_run.execution_date LIKE '2018...
Happy to submit a PR. Just wanted to give a heads-up first.
Any chance you could add the child_process_log_directory
to the list of logs cleaned up?
I'm trying to run your DAG for maintenance and the first time I tried it i got in the log
Another task is already deleting logs on this worker node. Skipping it!
If you believe you're receiving this message in error, kindly check if /tmp/airflow_log_cleanup_worker.lock exists and delete it.
I RM the file then retry it and got :
Lock file not found on this node! Creating it to prevent collisions...
Error creating the lock file. Check if the airflow user can create files under tmp directory. Exiting...
The 3rd I got once again the issue with the .lock file
I have given the right to my user airflow
to the /tmp folder with sudo setfacl -m u:airflow:rwx /tmp
. How can my user create the ,lock file but then not having the right to do it ?
So, I believe this is only in the newest version of Airflow
from airflow.models import settings
for the cleanup DAG
Throws this ImportError: cannot import name 'settings' from 'airflow.models' (/usr/local/lib/python3.7/site-packages/airflow/models/__init__.py)
So, apparently, there are not settings
? Not sure yet if this is truly an issue with this cleanup dag or is something else with the configuration
Awesome DAGs - these are very useful. However, I cannot get the 'airflow-kill-halted-tasks' DAG to run. I always get the below error.
Airflow==1.9.0
Celery==4.1.0
psycopg2==2.7.4
sql_alchemy_conn = postgresql+psycopg2://airflow:...
[2018-08-01 13:01:28,394] {base_task_runner.py:98} INFO - Subtask: cursor.execute(statement, parameters)
[2018-08-01 13:01:28,396] {base_task_runner.py:98} INFO - Subtask: sqlalchemy.exc.InternalError: (psycopg2.InternalError) current transaction is aborted, commands ignored until end of transaction block
[2018-08-01 13:01:28,400] {base_task_runner.py:98} INFO - Subtask: [SQL: 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid \nFROM task_instance \nWHERE task_instance.task_id = %(param_1)s AND task_instance.dag_id = %(param_2)s AND task_instance.execution_date = %(param_3)s'] [parameters: {'param_1': 'kill_halted_tasks', 'param_3': datetime.datetime(2018, 8, 1, 12, 0), 'param_2': 'airflow-kill-halted-tasks'}] (Background on this error at: http://sqlalche.me/e/2j85)
^Does this guarantee cleanup in all workers.
Consider scenario where one worker can have more than one slot in the pool config.
Does the current cleanup deal with celery_taskmeta
and celery_tasksetmeta
tables?
If not, how can I add it to the DATABASE_OBJECTS
array? there isn't a corresponding airflow.models for it
Thanks!
Hi
I am using Airflow 2, when use airflow-db-cleanup.py it shows an error when combining code:
missing BaseJob in airflow.jobs class
===> from airflow.jobs import BaseJob
I removed that, and it run well. But nothing happens, Dagrun, DagInstance ... still the same. On Airflow UI still show alot row.
I saw there some configs such as # of workers specified. If you run the KubernetesExecutor in airflow on k8's, there isn't really any set # of workers. Instead each task get's a new pod created. So are these runnable on K8's or must they be modified?
The log cleanup DAG works great, but it misses the dag_processor_manager.log
file. I'm not sure if this file is new in Airflow or not. The configuration for it was added in 1.10.2, but the file might have existed earlier. Unlike the other log files, this one is just a standard, sequential log. It lives in the dag_processor_manager
directory, but is not nested within any subdirectories named after the date it was run, like the scheduler and individual DAGs are. It's possible to use logrotate
to manage this log file, but it would be nice if everything were together on one place.
There are many different airflow deployments.
The strongest trend now goes with kubernetes executor. There are simpler deployments for small environments too where LocalExecutor feels convenient.
Both deployments can not take advantaje of your, previously magnificent, db cleanup script, because it requires the celerity module.
log_cleanup_file_op.set_downstream(log_cleanup_dir_op)
you cleanup file first, which changes the directory modify time to now, and the
find /home/xiaoju/airflow/logs/*/* -type d -empty -mtime +30
will return nothing
When running with a scheduler this part of the code results in failed tasks for me with the following error:
MAX_LOG_AGE_IN_DAYS="{{dag_run.conf.maxLogAgeInDays}}"
if [ "${MAX_LOG_AGE_IN_DAYS}" == "" ]; then
echo "maxLogAgeInDays conf variable isn't included. Using Default '""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'."
MAX_LOG_AGE_IN_DAYS='""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """'
fi
Error message:
'dict object' has no attribute 'maxLogAgeInDays'
It seems that the dag does not find the entry maxLogAgeInDays
.
I wanted to ask where this value should be set? Following the readme, I only created two variables called airflow_log_cleanup__max_log_age_in_days
and airflow_log_cleanup__enable_delete_child_log
which are getting called for setting the variables DEFAULT_MAX_LOG_AGE_IN_DAYS
and ENABLE_DELETE_CHILD_LOG
.
I managed to get it to work by substituting the whole code above by:
MAX_LOG_AGE_IN_DAYS=""" + str(DEFAULT_MAX_LOG_AGE_IN_DAYS) + """
Since the DEFAULT_MAX_LOG_AGE_IN_DAYS
already reads out the variable set in the UI, I do not need dag_run.conf.maxLogAgeInDays
, right?
In general, I would suggest to rename DEFAULT_MAX_LOG_AGE_IN_DAYS
to MAX_LOG_AGE_IN_DAYS
, because the variable is not the default, but the 30
in the .get()
is:
DEFAULT_MAX_LOG_AGE_IN_DAYS = Variable.get(
"airflow_log_cleanup__max_log_age_in_days", 30
)
One more thing:
Following best practices it would be better to combine the two variables to a single Variable in the JSON format to reduce DB requests:
airflow_log_cleanup = Variable.get("airflow_log_cleanup", deserialize_json=True)
DEFAULT_MAX_LOG_AGE_IN_DAYS = airflow_log_cleanup.get("max_log_age_in_days", 30)
ENABLE_DELETE_CHILD_LOG = airflow_log_cleanup.get("enable_delete_child_log", "False")
I am using AirFlow 2.0.1
Hey @rssanders3 - I'm running a large number of Airflow instances in a cluster environment and would love to get more info around how you're using these scripts.
For db-cleanup, do you really delete rows out of task_instance, dag_run, etc in a prod database? I've always thought of that as a source of truth to keep forever. That aside, my Airflow DBs are growing. For example, ~5M rows in task_instance. I haven't really taken the time to profile Airflow's queries to determine if/when this becomes significant.
For log-cleanup, are you running something similar on scheduler logs or the logs from the DAG file processor? I've definitely seen those get big. I've been wondering about the best way to deal with this inside/outside Airflow. Wondering if configuring log rotation is the best solution.
Happy to chat over email / gitter / slack / etc if there's a better channel for discussion.
When running the airflow db cleanup dag, it loads the full rows in memory as per
airflow-maintenance-dags/db-cleanup/airflow-db-cleanup.py
Lines 101 to 103 in 5f9ab63
Translating into this (postgres) query, e.g.
SELECT xcom.id AS xcom_id, xcom.key AS xcom_key, xcom.value AS xcom_value, xcom.timestamp AS xcom_timestamp, xcom.execution_date AS xcom_execution_date, xcom.task_id AS xcom_task_id, xcom.dag_id AS xcom_dag_id
FROM xcom
WHERE xcom.execution_date <= '2018-11-24T16:48:30.861484+00:00'::timestamptz
For xcom
tables with large values being passed around, this quickly leads to out of memory.
As the purpose of the session.query
is to only read rows to be deleted, it would be better to only load enough information to subsequently delete the rows (and any other essential info, e.g. timestamp).
I'm not a python / sqlalchemy expert, but I can see that something like load_only is available for this
Hi, I'm getting the following error when running this cleanup dag in my dev-airflow environment.
[2019-04-17 18:23:43,965] {models.py:1788} ERROR - (_mysql_exceptions.OperationalError) (1093, "You can't specify target table 'dag_run' for update in FROM clause") [SQL: 'DELETE FROM dag_run WHERE dag_run.execution_date NOT IN (SELECT max(dag_run.execution_date) AS max_1 \nFROM dag_run GROUP BY dag_run.dag_id) AND dag_run.execution_date <= %s'] [parameters: (datetime.datetime(2019, 3, 18, 18, 17, 57, 359710, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),)]
My understanding is that MySQL won't let you delete from the dag_run table if dag_run is used in the subquery.
Have you encountered this issue? How did you get around it?
Thanks
The SQL query throws an exception when trying to delete dag runs:
[2019-09-13 11:58:27,276] {logging_mixin.py:95} INFO - [2019-09-13 11:58:27,276] {airflow-db-cleanup.py:134} INFO - Process will be Deleting 143 DagRun(s)
[2019-09-13 11:58:27,276] {logging_mixin.py:95} INFO - [2019-09-13 11:58:27,276] {airflow-db-cleanup.py:137} INFO - Performing Delete...
[2019-09-13 11:58:27,277] {taskinstance.py:1047} ERROR - (_mysql_exceptions.OperationalError) (1093, "You can't specify target table 'dag_run' for update in FROM clause")
[SQL: DELETE FROM dag_run WHERE dag_run.execution_date NOT IN (SELECT max(dag_run.execution_date) AS max_1
FROM dag_run GROUP BY dag_run.dag_id) AND dag_run.execution_date <= %s]
[parameters: (datetime.datetime(2019, 8, 14, 8, 56, 48, 619622, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),)]
(Background on this error at: http://sqlalche.me/e/e3q8)
Traceback (most recent call last):
File "/home/airflow/venv37/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1244, in _execute_context
cursor, statement, parameters, context
File "/home/airflow/venv37/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 255, in execute
self.errorhandler(self, exc, value)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/connections.py", line 50, in defaulterrorhandler
raise errorvalue
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 252, in execute
res = self._query(query)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/cursors.py", line 378, in _query
db.query(q)
File "/home/airflow/venv37/lib/python3.7/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
_mysql_exceptions.OperationalError: (1093, "You can't specify target table 'dag_run' for update in FROM clause")
Airflow 1.10.4
Also,
Will this be merged into Airflow master?
Hi,
In airlfow_log_cleanup DAG ,i have made some changes and Deployed them in my Environment but i there are Duplicate tasks created in the DAG with a name extension of _0 & _1
How can we avoid this Duplicate tasks? i tried executing the airflow_db_clean DAG but still it doesn't work.
Thanks
Delete operation is happening inside a for loop for every object that we get from the select query here.
SELECT log.id AS log_id, log.dttm AS log_dttm FROM log WHERE log.dttm <= %s
I have no idea on how the sqlAlchemy works, but from the I can infer that we are trying to delete item by item followed by a commit call.
for entry in entries_to_delete:
session.delete(entry)
session.commit()
But why dont we do a bulk delete?
--- A sql version will look like this.
DELETE from log where log.dttm < "max-time-to-retain-we-get-from-select-query"
If someone can help me understand the feasibility of this method, I would like to do a PR for this.
Why can't we do a query.delete()
?
This util dag is not deleting airflow logs for the master node but doing a delete for all worker nodes, please guide to do delete for the master node also where It is Airflow Cluster with Celery Executor. Thanks
I'm trying to run two dags using Dask
and getting: Only works with the CeleryExecutor, sorry
Any tips?
Thanks
Joao
It is better to have some logic to exclude failed tasks.
I check the way that you deleting data, and you just relied on execution date, I know I can add success as keep_filter in dag_run, but also removing logs should only remove logs for deleted dag_run, otherwise, we face with an issue in UI
Hi,
iam getting bvelow import error / syntax error while executing the script in airflow container
[2019-10-31 09:47:52,516] {models.py:377} ERROR - Failed to import: /usr/local/airflow/dags/airflow_clear_missing_dags.py
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "", line 684, in _load
File "", line 665, in _load_unlocked
File "", line 674, in exec_module
File "", line 781, in get_code
File "", line 741, in source_to_code
File "", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/airflow_clear_missing_dags.py", line 52
except Exception, e:
^
SyntaxError: invalid syntax
Could you please help me,if you have a solution / fix for this?
thanks
Hi, sorry if this sounds stupid... after running this dag as is, I have not noticed any changes in the disk space in the airflow server. Am I doing anything wrong?
Hi teamclairvoyant,
I have tried using your maintenance DAGs and have spotted a syntax error. I've created a PR #53 with a fix.
Thanks
We configure the maintenance DAG to keep 90-day history which works well. Recently we added a new DAG which has to backfill the data from the beginning of the year. However, when maintenance DAG runs, it just removes all DAG Runs older than 90 days, including the new DAG. This caused the new DAG to rerun from the beginning again. Seems the maintenance DAG should keep an least one DAG Run if a DAG is still active.
^ How does running multiple instances of task with a different id guarantee that the maintenance ran on every worker node?
I believe the task in a DAG is send to worker randomly by scheduler. How do you make sure that all worker machine get logs cleaned?
In the code I just see a loop of NUMBER_OF_WORKERS, but this don't guarantee that each worker will run the task and get log cleaned up.
Airflow had trouble parsing this airflow-db-cleanup dag. RenderedTaskInstanceFields could not be found.
I don't know what it is and so I just removed it in the from airflow.models import
and also from DATABASE_OBJECTS
.
Airflow v1.10.4 using the sequential operator
When the max_db_entry_age_in_days
is retrieved from Variable.get
This leads to the following error:
File "/usr/local/airflow/dags/maintenance/airflow-db-cleanup.py", line 66, in print_configuration_function
max_date = datetime.now(tz=tzinfo) + timedelta(-max_db_entry_age_in_days)
TypeError: bad operand type for unary -: 'str'
Wrapping it with an int()
seems to solve the problem:
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(Variable.get("max_db_entry_age_in_days", 30))
Running: Apache Airflow version 1.10.12
Following code does not account a valid state queued and wrongfully kills them.
Hi,
Nice job, i appreciate these dags !!
With airflow 1.9, the session alchemy is instanced with autocommit=false
In settings.py :
Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))
I add a session.commit() after the loop session.delete(entry) in the script airflow-db-cleanup.py, and it's work !!!
Files don't get deleted when BASE_LOG_FOLDER ends with /
Dag file which change the name of the dag but do not use a new filename are not cleaned up.
This should be possible by checking against fileloc_hash
stored in dag_code
and dag_hash
in serialized_dag
(when dag serialization is enabled)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.