Skip to content

Commit f396dc1

Browse files
Airflow db cleanup dag update (#8851)
* Fix and adjust to Airflow 2.3 db cleanup DAG * Remove cleanup Task and TaskSet tables * removing duplicated airflow_1 file + some minor fixes * fix lint errors * small fix * small fix in deletion * separate dag logic between airflow versions * move session closing to finally part * lint fixes * import order fix Co-authored-by: Leah E. Cole <[email protected]>
1 parent 66e069f commit f396dc1

File tree

2 files changed

+185
-221
lines changed

2 files changed

+185
-221
lines changed

composer/airflow_1_samples/airflow_db_cleanup.py

Lines changed: 90 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,12 @@
5353

5454
import airflow
5555
from airflow import settings
56-
from airflow.configuration import conf
5756
from airflow.jobs import BaseJob
5857
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
5958
TaskInstance, Variable, XCom
6059
from airflow.operators.python_operator import PythonOperator
60+
from airflow.version import version as airflow_version
61+
6162
import dateutil.parser
6263
from sqlalchemy import and_, func
6364
from sqlalchemy.exc import ProgrammingError
@@ -66,6 +67,7 @@
6667
try:
6768
# airflow.utils.timezone is available from v1.10 onwards
6869
from airflow.utils import timezone
70+
6971
now = timezone.utcnow
7072
except ImportError:
7173
now = datetime.utcnow
@@ -79,9 +81,11 @@
7981
DAG_OWNER_NAME = "operations"
8082
# List of email address to send email alerts to if this job fails
8183
ALERT_EMAIL_ADDRESSES = []
84+
# Airflow version used by the environment in list form, value stored in
85+
# airflow_version is in format e.g "1.10.15+composer"
86+
AIRFLOW_VERSION = airflow_version[:-len("+composer")].split(".")
8287
# Length to retain the log files if not already provided in the conf. If this
8388
# is set to 30, the job will remove those files that arE 30 days old or older.
84-
8589
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
8690
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
8791
# Prints the database entries which will be getting deleted; set to False
@@ -139,6 +143,7 @@
139143
# Check for TaskReschedule model
140144
try:
141145
from airflow.models import TaskReschedule
146+
142147
DATABASE_OBJECTS.append({
143148
"airflow_db_model": TaskReschedule,
144149
"age_check_column": TaskReschedule.execution_date,
@@ -153,6 +158,7 @@
153158
# Check for TaskFail model
154159
try:
155160
from airflow.models import TaskFail
161+
156162
DATABASE_OBJECTS.append({
157163
"airflow_db_model": TaskFail,
158164
"age_check_column": TaskFail.execution_date,
@@ -164,23 +170,10 @@
164170
except Exception as e:
165171
logging.error(e)
166172

167-
# Check for RenderedTaskInstanceFields model
168-
try:
169-
from airflow.models import RenderedTaskInstanceFields
170-
DATABASE_OBJECTS.append({
171-
"airflow_db_model": RenderedTaskInstanceFields,
172-
"age_check_column": RenderedTaskInstanceFields.execution_date,
173-
"keep_last": False,
174-
"keep_last_filters": None,
175-
"keep_last_group_by": None
176-
})
177-
178-
except Exception as e:
179-
logging.error(e)
180-
181173
# Check for ImportError model
182174
try:
183175
from airflow.models import ImportError
176+
184177
DATABASE_OBJECTS.append({
185178
"airflow_db_model": ImportError,
186179
"age_check_column": ImportError.timestamp,
@@ -193,34 +186,6 @@
193186
except Exception as e:
194187
logging.error(e)
195188

196-
# Check for celery executor
197-
airflow_executor = str(conf.get("core", "executor"))
198-
logging.info("Airflow Executor: " + str(airflow_executor))
199-
if (airflow_executor == "CeleryExecutor"):
200-
logging.info("Including Celery Modules")
201-
try:
202-
from celery.backends.database.models import Task, TaskSet
203-
DATABASE_OBJECTS.extend(({
204-
"airflow_db_model": Task,
205-
"age_check_column": Task.date_done,
206-
"keep_last": False,
207-
"keep_last_filters": None,
208-
"keep_last_group_by": None,
209-
"do_not_delete_by_dag_id": True
210-
}, {
211-
"airflow_db_model": TaskSet,
212-
"age_check_column": TaskSet.date_done,
213-
"keep_last": False,
214-
"keep_last_filters": None,
215-
"keep_last_group_by": None,
216-
"do_not_delete_by_dag_id": True
217-
}))
218-
219-
except Exception as e:
220-
logging.error(e)
221-
222-
session = settings.Session()
223-
224189
default_args = {
225190
"owner": DAG_OWNER_NAME,
226191
"depends_on_past": False,
@@ -252,7 +217,7 @@ def print_configuration_function(**context):
252217
max_db_entry_age_in_days = dag_run_conf.get(
253218
"maxDBEntryAgeInDays", None)
254219
logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
255-
if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1):
220+
if max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1:
256221
logging.info(
257222
"maxDBEntryAgeInDays conf variable isn't included or Variable " +
258223
"value is less than 1. Using Default '" +
@@ -266,7 +231,6 @@ def print_configuration_function(**context):
266231
logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
267232
logging.info("max_date: " + str(max_date))
268233
logging.info("enable_delete: " + str(ENABLE_DELETE))
269-
logging.info("session: " + str(session))
270234
logging.info("")
271235

272236
logging.info("Setting max_execution_date to XCom for Downstream Processes")
@@ -280,7 +244,57 @@ def print_configuration_function(**context):
280244
dag=dag)
281245

282246

247+
def build_query(session, airflow_db_model, age_check_column, max_date,
248+
keep_last, keep_last_filters=None, keep_last_group_by=None):
249+
query = session.query(airflow_db_model).options(
250+
load_only(age_check_column))
251+
252+
logging.info("INITIAL QUERY : " + str(query))
253+
254+
if not keep_last:
255+
query = query.filter(age_check_column <= max_date, )
256+
else:
257+
subquery = session.query(func.max(DagRun.execution_date))
258+
# workaround for MySQL "table specified twice" issue
259+
# https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
260+
if keep_last_filters is not None:
261+
for entry in keep_last_filters:
262+
subquery = subquery.filter(entry)
263+
264+
logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))
265+
266+
if keep_last_group_by is not None:
267+
subquery = subquery.group_by(keep_last_group_by)
268+
logging.info(
269+
"SUB QUERY [keep_last_group_by]: " +
270+
str(subquery))
271+
272+
subquery = subquery.from_self()
273+
274+
query = query.filter(
275+
and_(age_check_column.notin_(subquery)),
276+
and_(age_check_column <= max_date))
277+
278+
return query
279+
280+
281+
def print_query(query, airflow_db_model, age_check_column):
282+
entries_to_delete = query.all()
283+
284+
logging.info("Query: " + str(query))
285+
logging.info("Process will be Deleting the following " +
286+
str(airflow_db_model.__name__) + "(s):")
287+
for entry in entries_to_delete:
288+
date = str(entry.__dict__[str(age_check_column).split(".")[1]])
289+
logging.info("\tEntry: " + str(entry) + ", Date: " + date)
290+
291+
logging.info("Process will be Deleting "
292+
+ str(len(entries_to_delete)) + " "
293+
+ str(airflow_db_model.__name__) + "(s)")
294+
295+
283296
def cleanup_function(**context):
297+
session = settings.Session()
284298

285299
logging.info("Retrieving max_execution_date from XCom")
286300
max_date = context["ti"].xcom_pull(
@@ -310,67 +324,34 @@ def cleanup_function(**context):
310324
logging.info("Running Cleanup Process...")
311325

312326
try:
313-
query = session.query(airflow_db_model).options(
314-
load_only(age_check_column))
315-
316-
logging.info("INITIAL QUERY : " + str(query))
317-
318-
if keep_last:
319-
320-
subquery = session.query(func.max(DagRun.execution_date))
321-
# workaround for MySQL "table specified twice" issue
322-
# https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
323-
if keep_last_filters is not None:
324-
for entry in keep_last_filters:
325-
subquery = subquery.filter(entry)
326-
327-
logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))
328-
329-
if keep_last_group_by is not None:
330-
subquery = subquery.group_by(keep_last_group_by)
331-
logging.info(
332-
"SUB QUERY [keep_last_group_by]: " +
333-
str(subquery))
334-
335-
subquery = subquery.from_self()
336-
337-
query = query.filter(
338-
and_(age_check_column.notin_(subquery)),
339-
and_(age_check_column <= max_date))
340-
341-
else:
342-
query = query.filter(age_check_column <= max_date,)
343-
344-
if PRINT_DELETES:
345-
entries_to_delete = query.all()
346-
347-
logging.info("Query: " + str(query))
348-
logging.info("Process will be Deleting the following " +
349-
str(airflow_db_model.__name__) + "(s):")
350-
for entry in entries_to_delete:
351-
date = str(entry.__dict__[str(age_check_column).split(".")[1]])
352-
logging.info("\tEntry: " + str(entry) + ", Date: " + date)
353-
354-
logging.info("Process will be Deleting "
355-
+ str(len(entries_to_delete)) + " "
356-
+ str(airflow_db_model.__name__) + "(s)")
327+
if context["params"].get("do_not_delete_by_dag_id"):
328+
query = build_query(session, airflow_db_model, age_check_column,
329+
max_date, keep_last, keep_last_filters,
330+
keep_last_group_by)
331+
if PRINT_DELETES:
332+
print_query(query, airflow_db_model, age_check_column)
333+
if ENABLE_DELETE:
334+
logging.info("Performing Delete...")
335+
query.delete(synchronize_session=False)
336+
session.commit()
357337
else:
358-
logging.warn(
359-
"You've opted to skip printing the db entries to be deleted. "
360-
"Set PRINT_DELETES to True to show entries!!!")
361-
362-
if ENABLE_DELETE:
363-
logging.info("Performing Delete...")
364-
if context["params"].get("do_not_delete_by_dag_id"):
365-
query.filter(age_check_column <= max_date).delete(synchronize_session=False)
338+
dags = session.query(airflow_db_model.dag_id).distinct()
339+
session.commit()
340+
341+
list_dags = [str(list(dag)[0]) for dag in dags]
342+
for dag in list_dags:
343+
query = build_query(session, airflow_db_model, age_check_column,
344+
max_date, keep_last, keep_last_filters,
345+
keep_last_group_by)
346+
query = query.filter(airflow_db_model.dag_id == dag)
347+
if PRINT_DELETES:
348+
print_query(query, airflow_db_model, age_check_column)
349+
if ENABLE_DELETE:
350+
logging.info("Performing Delete...")
351+
query.delete(synchronize_session=False)
366352
session.commit()
367-
else:
368-
dags = session.query(airflow_db_model.dag_id).distinct()
369-
list_dags = [str(list(dag)[0]) for dag in dags]
370-
for dag in list_dags:
371-
query.filter(age_check_column <= max_date).filter(airflow_db_model.dag_id == dag).delete(synchronize_session=False)
372-
session.commit()
373-
else:
353+
354+
if not ENABLE_DELETE:
374355
logging.warn("You've opted to skip deleting the db entries. "
375356
"Set ENABLE_DELETE to True to delete entries!!!")
376357

@@ -379,12 +360,13 @@ def cleanup_function(**context):
379360
except ProgrammingError as e:
380361
logging.error(e)
381362
logging.error(
382-
str(airflow_db_model) + " is not present in the metadata."
383-
"Skipping...")
363+
str(airflow_db_model) + " is not present in the metadata. "
364+
"Skipping...")
365+
finally:
366+
session.close()
384367

385368

386369
for db_object in DATABASE_OBJECTS:
387-
388370
cleanup_op = PythonOperator(
389371
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
390372
python_callable=cleanup_function,

0 commit comments

Comments
 (0)