Skip to content

Commit 7ddc791

Browse files
maciekgawronleahecoledinagraves
authored
Add DAG for airflow metadb cleanup for Composer. (#4993)
* Add DAG for airflow metadb cleanup for Composer. * Moved the db-cleanup dag and added a test. * Moved db cleanup and integrate readme into the dag Additionally some import order changes to be compliant with nox linter. * Restored the original order of imports to pass ci. Co-authored-by: Leah E. Cole <[email protected]> Co-authored-by: Dina Graves Portman <[email protected]>
1 parent e1c7a74 commit 7ddc791

File tree

2 files changed

+411
-0
lines changed

2 files changed

+411
-0
lines changed
Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
# Copyright 2020 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""
15+
A maintenance workflow that you can deploy into Airflow to periodically clean
16+
out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid
17+
having too much data in your Airflow MetaStore.
18+
19+
## Authors
20+
21+
The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup)
22+
23+
## Usage
24+
25+
1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME,
26+
ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values
27+
28+
2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each
29+
dictionary in the list features the following parameters:
30+
- airflow_db_model: Model imported from airflow.models corresponding to
31+
a table in the airflow metadata database
32+
- age_check_column: Column in the model/table to use for calculating max
33+
date of data deletion
34+
- keep_last: Boolean to specify whether to preserve last run instance
35+
- keep_last_filters: List of filters to preserve data from deleting
36+
during clean-up, such as DAG runs where the external trigger is set to 0.
37+
- keep_last_group_by: Option to specify column by which to group the
38+
database entries and perform aggregate functions.
39+
40+
3. Create and Set the following Variables in the Airflow Web Server
41+
(Admin -> Variables)
42+
- airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain
43+
the log files if not already provided in the conf. If this is set to 30,
44+
the job will remove those files that are 30 days old or older.
45+
46+
4. Put the DAG in your gcs bucket.
47+
"""
48+
from datetime import datetime, timedelta
49+
import logging
50+
import os
51+
52+
import airflow
53+
from airflow import settings
54+
from airflow.configuration import conf
55+
from airflow.jobs import BaseJob
56+
from airflow.models import DAG, DagModel, DagRun, Log, SlaMiss, \
57+
TaskInstance, Variable, XCom
58+
from airflow.operators.python_operator import PythonOperator
59+
import dateutil.parser
60+
from sqlalchemy import and_, func
61+
from sqlalchemy.exc import ProgrammingError
62+
from sqlalchemy.orm import load_only
63+
64+
try:
65+
# airflow.utils.timezone is available from v1.10 onwards
66+
from airflow.utils import timezone
67+
now = timezone.utcnow
68+
except ImportError:
69+
now = datetime.utcnow
70+
71+
# airflow-db-cleanup
72+
DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "")
73+
START_DATE = airflow.utils.dates.days_ago(1)
74+
# How often to Run. @daily - Once a day at Midnight (UTC)
75+
SCHEDULE_INTERVAL = "@daily"
76+
# Who is listed as the owner of this DAG in the Airflow Web Server
77+
DAG_OWNER_NAME = "operations"
78+
# List of email address to send email alerts to if this job fails
79+
ALERT_EMAIL_ADDRESSES = []
80+
# Length to retain the log files if not already provided in the conf. If this
81+
# is set to 30, the job will remove those files that arE 30 days old or older.
82+
83+
DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int(
84+
Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30))
85+
# Prints the database entries which will be getting deleted; set to False
86+
# to avoid printing large lists and slowdown process
87+
PRINT_DELETES = False
88+
# Whether the job should delete the db entries or not. Included if you want to
89+
# temporarily avoid deleting the db entries.
90+
ENABLE_DELETE = True
91+
# List of all the objects that will be deleted. Comment out the DB objects you
92+
# want to skip.
93+
DATABASE_OBJECTS = [{
94+
"airflow_db_model": BaseJob,
95+
"age_check_column": BaseJob.latest_heartbeat,
96+
"keep_last": False,
97+
"keep_last_filters": None,
98+
"keep_last_group_by": None
99+
}, {
100+
"airflow_db_model": DagRun,
101+
"age_check_column": DagRun.execution_date,
102+
"keep_last": True,
103+
"keep_last_filters": [DagRun.external_trigger.is_(False)],
104+
"keep_last_group_by": DagRun.dag_id
105+
}, {
106+
"airflow_db_model": TaskInstance,
107+
"age_check_column": TaskInstance.execution_date,
108+
"keep_last": False,
109+
"keep_last_filters": None,
110+
"keep_last_group_by": None
111+
}, {
112+
"airflow_db_model": Log,
113+
"age_check_column": Log.dttm,
114+
"keep_last": False,
115+
"keep_last_filters": None,
116+
"keep_last_group_by": None
117+
}, {
118+
"airflow_db_model": XCom,
119+
"age_check_column": XCom.execution_date,
120+
"keep_last": False,
121+
"keep_last_filters": None,
122+
"keep_last_group_by": None
123+
}, {
124+
"airflow_db_model": SlaMiss,
125+
"age_check_column": SlaMiss.execution_date,
126+
"keep_last": False,
127+
"keep_last_filters": None,
128+
"keep_last_group_by": None
129+
}, {
130+
"airflow_db_model": DagModel,
131+
"age_check_column": DagModel.last_scheduler_run,
132+
"keep_last": False,
133+
"keep_last_filters": None,
134+
"keep_last_group_by": None
135+
}]
136+
137+
# Check for TaskReschedule model
138+
try:
139+
from airflow.models import TaskReschedule
140+
DATABASE_OBJECTS.append({
141+
"airflow_db_model": TaskReschedule,
142+
"age_check_column": TaskReschedule.execution_date,
143+
"keep_last": False,
144+
"keep_last_filters": None,
145+
"keep_last_group_by": None
146+
})
147+
148+
except Exception as e:
149+
logging.error(e)
150+
151+
# Check for TaskFail model
152+
try:
153+
from airflow.models import TaskFail
154+
DATABASE_OBJECTS.append({
155+
"airflow_db_model": TaskFail,
156+
"age_check_column": TaskFail.execution_date,
157+
"keep_last": False,
158+
"keep_last_filters": None,
159+
"keep_last_group_by": None
160+
})
161+
162+
except Exception as e:
163+
logging.error(e)
164+
165+
# Check for RenderedTaskInstanceFields model
166+
try:
167+
from airflow.models import RenderedTaskInstanceFields
168+
DATABASE_OBJECTS.append({
169+
"airflow_db_model": RenderedTaskInstanceFields,
170+
"age_check_column": RenderedTaskInstanceFields.execution_date,
171+
"keep_last": False,
172+
"keep_last_filters": None,
173+
"keep_last_group_by": None
174+
})
175+
176+
except Exception as e:
177+
logging.error(e)
178+
179+
# Check for ImportError model
180+
try:
181+
from airflow.models import ImportError
182+
DATABASE_OBJECTS.append({
183+
"airflow_db_model": ImportError,
184+
"age_check_column": ImportError.timestamp,
185+
"keep_last": False,
186+
"keep_last_filters": None,
187+
"keep_last_group_by": None
188+
})
189+
190+
except Exception as e:
191+
logging.error(e)
192+
193+
# Check for celery executor
194+
airflow_executor = str(conf.get("core", "executor"))
195+
logging.info("Airflow Executor: " + str(airflow_executor))
196+
if (airflow_executor == "CeleryExecutor"):
197+
logging.info("Including Celery Modules")
198+
try:
199+
from celery.backends.database.models import Task, TaskSet
200+
DATABASE_OBJECTS.extend(({
201+
"airflow_db_model": Task,
202+
"age_check_column": Task.date_done,
203+
"keep_last": False,
204+
"keep_last_filters": None,
205+
"keep_last_group_by": None
206+
}, {
207+
"airflow_db_model": TaskSet,
208+
"age_check_column": TaskSet.date_done,
209+
"keep_last": False,
210+
"keep_last_filters": None,
211+
"keep_last_group_by": None
212+
}))
213+
214+
except Exception as e:
215+
logging.error(e)
216+
217+
session = settings.Session()
218+
219+
default_args = {
220+
"owner": DAG_OWNER_NAME,
221+
"depends_on_past": False,
222+
"email": ALERT_EMAIL_ADDRESSES,
223+
"email_on_failure": True,
224+
"email_on_retry": False,
225+
"start_date": START_DATE,
226+
"retries": 1,
227+
"retry_delay": timedelta(minutes=1)
228+
}
229+
230+
dag = DAG(
231+
DAG_ID,
232+
default_args=default_args,
233+
schedule_interval=SCHEDULE_INTERVAL,
234+
start_date=START_DATE)
235+
if hasattr(dag, "doc_md"):
236+
dag.doc_md = __doc__
237+
if hasattr(dag, "catchup"):
238+
dag.catchup = False
239+
240+
241+
def print_configuration_function(**context):
242+
logging.info("Loading Configurations...")
243+
dag_run_conf = context.get("dag_run").conf
244+
logging.info("dag_run.conf: " + str(dag_run_conf))
245+
max_db_entry_age_in_days = None
246+
if dag_run_conf:
247+
max_db_entry_age_in_days = dag_run_conf.get(
248+
"maxDBEntryAgeInDays", None)
249+
logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
250+
if (max_db_entry_age_in_days is None or max_db_entry_age_in_days < 1):
251+
logging.info(
252+
"maxDBEntryAgeInDays conf variable isn't included or Variable " +
253+
"value is less than 1. Using Default '" +
254+
str(DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS) + "'")
255+
max_db_entry_age_in_days = DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS
256+
max_date = now() + timedelta(-max_db_entry_age_in_days)
257+
logging.info("Finished Loading Configurations")
258+
logging.info("")
259+
260+
logging.info("Configurations:")
261+
logging.info("max_db_entry_age_in_days: " + str(max_db_entry_age_in_days))
262+
logging.info("max_date: " + str(max_date))
263+
logging.info("enable_delete: " + str(ENABLE_DELETE))
264+
logging.info("session: " + str(session))
265+
logging.info("")
266+
267+
logging.info("Setting max_execution_date to XCom for Downstream Processes")
268+
context["ti"].xcom_push(key="max_date", value=max_date.isoformat())
269+
270+
271+
print_configuration = PythonOperator(
272+
task_id="print_configuration",
273+
python_callable=print_configuration_function,
274+
provide_context=True,
275+
dag=dag)
276+
277+
278+
def cleanup_function(**context):
279+
280+
logging.info("Retrieving max_execution_date from XCom")
281+
max_date = context["ti"].xcom_pull(
282+
task_ids=print_configuration.task_id, key="max_date")
283+
max_date = dateutil.parser.parse(max_date) # stored as iso8601 str in xcom
284+
285+
airflow_db_model = context["params"].get("airflow_db_model")
286+
state = context["params"].get("state")
287+
age_check_column = context["params"].get("age_check_column")
288+
keep_last = context["params"].get("keep_last")
289+
keep_last_filters = context["params"].get("keep_last_filters")
290+
keep_last_group_by = context["params"].get("keep_last_group_by")
291+
292+
logging.info("Configurations:")
293+
logging.info("max_date: " + str(max_date))
294+
logging.info("enable_delete: " + str(ENABLE_DELETE))
295+
logging.info("session: " + str(session))
296+
logging.info("airflow_db_model: " + str(airflow_db_model))
297+
logging.info("state: " + str(state))
298+
logging.info("age_check_column: " + str(age_check_column))
299+
logging.info("keep_last: " + str(keep_last))
300+
logging.info("keep_last_filters: " + str(keep_last_filters))
301+
logging.info("keep_last_group_by: " + str(keep_last_group_by))
302+
303+
logging.info("")
304+
305+
logging.info("Running Cleanup Process...")
306+
307+
try:
308+
query = session.query(airflow_db_model).options(
309+
load_only(age_check_column))
310+
311+
logging.info("INITIAL QUERY : " + str(query))
312+
313+
if keep_last:
314+
315+
subquery = session.query(func.max(DagRun.execution_date))
316+
# workaround for MySQL "table specified twice" issue
317+
# https://github.com/teamclairvoyant/airflow-maintenance-dags/issues/41
318+
if keep_last_filters is not None:
319+
for entry in keep_last_filters:
320+
subquery = subquery.filter(entry)
321+
322+
logging.info("SUB QUERY [keep_last_filters]: " + str(subquery))
323+
324+
if keep_last_group_by is not None:
325+
subquery = subquery.group_by(keep_last_group_by)
326+
logging.info(
327+
"SUB QUERY [keep_last_group_by]: " +
328+
str(subquery))
329+
330+
subquery = subquery.from_self()
331+
332+
query = query.filter(
333+
and_(age_check_column.notin_(subquery)),
334+
and_(age_check_column <= max_date))
335+
336+
else:
337+
query = query.filter(age_check_column <= max_date,)
338+
339+
if PRINT_DELETES:
340+
entries_to_delete = query.all()
341+
342+
logging.info("Query: " + str(query))
343+
logging.info("Process will be Deleting the following " +
344+
str(airflow_db_model.__name__) + "(s):")
345+
for entry in entries_to_delete:
346+
date = str(entry.__dict__[str(age_check_column).split(".")[1]])
347+
logging.info("\tEntry: " + str(entry) + ", Date: " + date)
348+
349+
logging.info("Process will be Deleting "
350+
+ str(len(entries_to_delete)) + " "
351+
+ str(airflow_db_model.__name__) + "(s)")
352+
else:
353+
logging.warn(
354+
"You've opted to skip printing the db entries to be deleted. "
355+
"Set PRINT_DELETES to True to show entries!!!")
356+
357+
if ENABLE_DELETE:
358+
logging.info("Performing Delete...")
359+
# using bulk delete
360+
query.delete(synchronize_session=False)
361+
session.commit()
362+
logging.info("Finished Performing Delete")
363+
else:
364+
logging.warn("You've opted to skip deleting the db entries. "
365+
"Set ENABLE_DELETE to True to delete entries!!!")
366+
367+
logging.info("Finished Running Cleanup Process")
368+
369+
except ProgrammingError as e:
370+
logging.error(e)
371+
logging.error(
372+
str(airflow_db_model) + " is not present in the metadata."
373+
"Skipping...")
374+
375+
376+
for db_object in DATABASE_OBJECTS:
377+
378+
cleanup_op = PythonOperator(
379+
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
380+
python_callable=cleanup_function,
381+
params=db_object,
382+
provide_context=True,
383+
dag=dag)
384+
385+
print_configuration.set_downstream(cleanup_op)

0 commit comments

Comments
 (0)