Skip to content

Commit 8cc207e

Browse files
committed
fix dependency resumes, properly this time
1 parent 99008b7 commit 8cc207e

File tree

4 files changed

+128
-121
lines changed

4 files changed

+128
-121
lines changed

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 119 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ import {
44
TaskRunExecution,
55
TaskRunExecutionResult,
66
} from "@trigger.dev/core/v3";
7-
import { $transaction } from "~/db.server";
7+
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
88
import { logger } from "~/services/logger.server";
99
import { marqs } from "~/v3/marqs/index.server";
1010
import { socketIo } from "../handleSocketIo.server";
1111
import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server";
1212
import { BaseService } from "./baseService.server";
13+
import { TaskRunAttempt } from "@trigger.dev/database";
1314

1415
export class ResumeAttemptService extends BaseService {
1516
public async call(
@@ -24,7 +25,7 @@ export class ResumeAttemptService extends BaseService {
2425
},
2526
include: {
2627
taskRun: true,
27-
taskRunDependency: {
28+
dependencies: {
2829
include: {
2930
taskRun: {
3031
include: {
@@ -40,8 +41,12 @@ export class ResumeAttemptService extends BaseService {
4041
},
4142
},
4243
},
44+
orderBy: {
45+
createdAt: "desc",
46+
},
47+
take: 1,
4348
},
44-
batchTaskRunDependency: {
49+
batchDependencies: {
4550
include: {
4651
items: {
4752
include: {
@@ -61,6 +66,10 @@ export class ResumeAttemptService extends BaseService {
6166
},
6267
},
6368
},
69+
orderBy: {
70+
createdAt: "desc",
71+
},
72+
take: 1,
6473
},
6574
},
6675
});
@@ -78,6 +87,8 @@ export class ResumeAttemptService extends BaseService {
7887
return;
7988
}
8089

90+
let completedAttemptIds: string[] = [];
91+
8192
switch (params.type) {
8293
case "WAIT_FOR_DURATION": {
8394
logger.error(
@@ -93,148 +104,140 @@ export class ResumeAttemptService extends BaseService {
93104
});
94105
break;
95106
}
96-
case "WAIT_FOR_TASK":
97-
case "WAIT_FOR_BATCH": {
98-
let completedAttemptIds: string[] = [];
99-
100-
if (attempt.taskRunDependency) {
101-
const dependentAttempt = attempt.taskRunDependency.taskRun.attempts[0];
107+
case "WAIT_FOR_TASK": {
108+
if (attempt.dependencies.length) {
109+
// We only care about the latest dependency
110+
const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0];
102111

103112
if (!dependentAttempt) {
104113
logger.error("No dependent attempt", { attemptId: attempt.id });
105114
return;
106115
}
107116

108117
completedAttemptIds = [dependentAttempt.id];
109-
110-
await tx.taskRunAttempt.update({
111-
where: {
112-
id: attempt.id,
113-
},
114-
data: {
115-
taskRunDependency: {
116-
disconnect: true,
117-
},
118-
},
119-
});
120-
} else if (attempt.batchTaskRunDependency) {
121-
const dependentBatchItems = attempt.batchTaskRunDependency.items;
118+
} else {
119+
logger.error("No task dependency", { attemptId: attempt.id });
120+
return;
121+
}
122+
break;
123+
}
124+
case "WAIT_FOR_BATCH": {
125+
if (attempt.batchDependencies) {
126+
// We only care about the latest batch dependency
127+
const dependentBatchItems = attempt.batchDependencies[0].items;
122128

123129
if (!dependentBatchItems) {
124130
logger.error("No dependent batch items", { attemptId: attempt.id });
125131
return;
126132
}
127133

128134
completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);
129-
130-
await tx.taskRunAttempt.update({
131-
where: {
132-
id: attempt.id,
133-
},
134-
data: {
135-
batchTaskRunDependency: {
136-
disconnect: true,
137-
},
138-
},
139-
});
140135
} else {
141-
logger.error("No dependencies", { attemptId: attempt.id });
142-
return;
143-
}
144-
145-
if (completedAttemptIds.length === 0) {
146-
logger.error("No completed attempt IDs", { attemptId: attempt.id });
136+
logger.error("No batch dependency", { attemptId: attempt.id });
147137
return;
148138
}
139+
break;
140+
}
141+
default: {
142+
break;
143+
}
144+
}
149145

150-
const completions: TaskRunExecutionResult[] = [];
151-
const executions: TaskRunExecution[] = [];
146+
await this.#handleDependencyResume(attempt, completedAttemptIds, tx);
147+
});
148+
}
152149

153-
for (const completedAttemptId of completedAttemptIds) {
154-
const completedAttempt = await tx.taskRunAttempt.findUnique({
155-
where: {
156-
id: completedAttemptId,
157-
taskRun: {
158-
lockedAt: {
159-
not: null,
160-
},
161-
lockedById: {
162-
not: null,
163-
},
164-
},
165-
},
166-
});
167-
168-
if (!completedAttempt) {
169-
logger.error("Completed attempt not found", {
170-
attemptId: attempt.id,
171-
completedAttemptId,
172-
});
173-
await marqs?.acknowledgeMessage(attempt.taskRunId);
174-
return;
175-
}
150+
async #handleDependencyResume(
151+
attempt: TaskRunAttempt,
152+
completedAttemptIds: string[],
153+
tx: PrismaClientOrTransaction
154+
) {
155+
if (completedAttemptIds.length === 0) {
156+
logger.error("No completed attempt IDs", { attemptId: attempt.id });
157+
return;
158+
}
159+
160+
const completions: TaskRunExecutionResult[] = [];
161+
const executions: TaskRunExecution[] = [];
162+
163+
for (const completedAttemptId of completedAttemptIds) {
164+
const completedAttempt = await tx.taskRunAttempt.findUnique({
165+
where: {
166+
id: completedAttemptId,
167+
taskRun: {
168+
lockedAt: {
169+
not: null,
170+
},
171+
lockedById: {
172+
not: null,
173+
},
174+
},
175+
},
176+
});
176177

177-
const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
178-
completedAttempt.id
179-
);
178+
if (!completedAttempt) {
179+
logger.error("Completed attempt not found", {
180+
attemptId: attempt.id,
181+
completedAttemptId,
182+
});
183+
await marqs?.acknowledgeMessage(attempt.taskRunId);
184+
return;
185+
}
180186

181-
if (!completion) {
182-
logger.error("Failed to get completion payload", {
183-
attemptId: attempt.id,
184-
completedAttemptId,
185-
});
186-
await marqs?.acknowledgeMessage(attempt.taskRunId);
187-
return;
188-
}
187+
const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
188+
completedAttempt.id
189+
);
189190

190-
completions.push(completion);
191+
if (!completion) {
192+
logger.error("Failed to get completion payload", {
193+
attemptId: attempt.id,
194+
completedAttemptId,
195+
});
196+
await marqs?.acknowledgeMessage(attempt.taskRunId);
197+
return;
198+
}
191199

192-
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
193-
completedAttempt.id
194-
);
200+
completions.push(completion);
195201

196-
if (!executionPayload) {
197-
logger.error("Failed to get execution payload", {
198-
attemptId: attempt.id,
199-
completedAttemptId,
200-
});
201-
await marqs?.acknowledgeMessage(attempt.taskRunId);
202-
return;
203-
}
202+
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
203+
completedAttempt.id
204+
);
204205

205-
executions.push(executionPayload.execution);
206-
}
206+
if (!executionPayload) {
207+
logger.error("Failed to get execution payload", {
208+
attemptId: attempt.id,
209+
completedAttemptId,
210+
});
211+
await marqs?.acknowledgeMessage(attempt.taskRunId);
212+
return;
213+
}
207214

208-
const updated = await tx.taskRunAttempt.update({
209-
where: {
210-
id: attempt.id,
211-
},
215+
executions.push(executionPayload.execution);
216+
}
217+
218+
const updated = await tx.taskRunAttempt.update({
219+
where: {
220+
id: attempt.id,
221+
},
222+
data: {
223+
status: "EXECUTING",
224+
taskRun: {
225+
update: {
212226
data: {
213-
status: "EXECUTING",
214-
taskRun: {
215-
update: {
216-
data: {
217-
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
218-
},
219-
},
220-
},
227+
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
221228
},
222-
});
229+
},
230+
},
231+
},
232+
});
223233

224-
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
225-
version: "v1",
226-
runId: attempt.taskRunId,
227-
attemptId: attempt.id,
228-
attemptFriendlyId: attempt.friendlyId,
229-
completions,
230-
executions,
231-
});
232-
break;
233-
}
234-
default: {
235-
break;
236-
}
237-
}
234+
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
235+
version: "v1",
236+
runId: attempt.taskRunId,
237+
attemptId: attempt.id,
238+
attemptFriendlyId: attempt.friendlyId,
239+
completions,
240+
executions,
238241
});
239242
}
240243
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- DropIndex
2+
DROP INDEX "TaskRunDependency_dependentAttemptId_key";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- DropIndex
2+
DROP INDEX "BatchTaskRun_dependentTaskAttemptId_key";

packages/database/prisma/schema.prisma

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,8 +1667,8 @@ model TaskRunDependency {
16671667
checkpointEventId String? @unique
16681668
16691669
/// An attempt that is dependent on this task run.
1670-
dependentAttempt TaskRunAttempt? @relation("dependentAttempt", fields: [dependentAttemptId], references: [id])
1671-
dependentAttemptId String? @unique
1670+
dependentAttempt TaskRunAttempt? @relation(fields: [dependentAttemptId], references: [id])
1671+
dependentAttemptId String?
16721672
16731673
/// A batch run that is dependent on this task run
16741674
dependentBatchRun BatchTaskRun? @relation("dependentBatchRun", fields: [dependentBatchRunId], references: [id])
@@ -1732,8 +1732,8 @@ model TaskRunAttempt {
17321732
output String?
17331733
outputType String @default("application/json")
17341734
1735-
taskRunDependency TaskRunDependency? @relation("dependentAttempt")
1736-
batchTaskRunDependency BatchTaskRun?
1735+
dependencies TaskRunDependency[]
1736+
batchDependencies BatchTaskRun[]
17371737
17381738
checkpoints Checkpoint[]
17391739
batchTaskRunItems BatchTaskRunItem[]
@@ -1904,7 +1904,7 @@ model BatchTaskRun {
19041904
runtimeEnvironmentId String
19051905
19061906
dependentTaskAttempt TaskRunAttempt? @relation(fields: [dependentTaskAttemptId], references: [id], onDelete: Cascade, onUpdate: Cascade)
1907-
dependentTaskAttemptId String? @unique
1907+
dependentTaskAttemptId String?
19081908
19091909
items BatchTaskRunItem[]
19101910
runDependencies TaskRunDependency[] @relation("dependentBatchRun")

0 commit comments

Comments
 (0)