Skip to content

Commit e001b88

Browse files
Lee-Wephraimbuddy
authored andcommitted
fix DagPriorityParsingRequest unique constraint error when dataset aliases are resolved into new datasets (#41398)
* fix(datasets/manager): fix DagPriorityParsingRequest unique constraint error when dataset aliases are resolved into new datasets this happens when dynamic task mapping is used * refactor(dataset/manager): reword debug log Co-authored-by: Ephraim Anierobi <[email protected]> * refactor(dataset/manager): remove unnecessary logging Co-authored-by: Ephraim Anierobi <[email protected]> --------- Co-authored-by: Ephraim Anierobi <[email protected]> (cherry picked from commit bf64cb6)
1 parent 8ea4eb1 commit e001b88

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed

airflow/datasets/manager.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,8 @@ def register_dataset_change(
140140

141141
dags_to_reparse = dags_to_queue_from_dataset_alias - dags_to_queue_from_dataset
142142
if dags_to_reparse:
143-
session.add_all(
144-
DagPriorityParsingRequest(fileloc=fileloc)
145-
for fileloc in {dag.fileloc for dag in dags_to_reparse}
146-
)
143+
file_locs = {dag.fileloc for dag in dags_to_reparse}
144+
cls._send_dag_priority_parsing_request(file_locs, session)
147145
session.flush()
148146

149147
cls.notify_dataset_changed(dataset=dataset)
@@ -208,6 +206,35 @@ def _postgres_queue_dagruns(cls, dataset_id: int, dags_to_queue: set[DagModel],
208206
stmt = insert(DatasetDagRunQueue).values(dataset_id=dataset_id).on_conflict_do_nothing()
209207
session.execute(stmt, values)
210208

209+
@classmethod
210+
def _send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
211+
if session.bind.dialect.name == "postgresql":
212+
return cls._postgres_send_dag_priority_parsing_request(file_locs, session)
213+
return cls._slow_path_send_dag_priority_parsing_request(file_locs, session)
214+
215+
@classmethod
216+
def _slow_path_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
217+
def _send_dag_priority_parsing_request_if_needed(fileloc: str) -> str | None:
218+
# Don't error whole transaction when a single DagPriorityParsingRequest item conflicts.
219+
# https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#using-savepoint
220+
req = DagPriorityParsingRequest(fileloc=fileloc)
221+
try:
222+
with session.begin_nested():
223+
session.merge(req)
224+
except exc.IntegrityError:
225+
cls.logger().debug("Skipping request %s, already present", req, exc_info=True)
226+
return None
227+
return req.fileloc
228+
229+
(_send_dag_priority_parsing_request_if_needed(fileloc) for fileloc in file_locs)
230+
231+
@classmethod
232+
def _postgres_send_dag_priority_parsing_request(cls, file_locs: Iterable[str], session: Session) -> None:
233+
from sqlalchemy.dialects.postgresql import insert
234+
235+
stmt = insert(DagPriorityParsingRequest).on_conflict_do_nothing()
236+
session.execute(stmt, {"fileloc": fileloc for fileloc in file_locs})
237+
211238

212239
def resolve_dataset_manager() -> DatasetManager:
213240
"""Retrieve the dataset manager."""

0 commit comments

Comments
 (0)