Skip to content

Commit 392453e

Browse files
authored
Frozen run fixes (#1286)
* When resuming a batch, only do marqs operations once * Made TaskRunDependency clearer in the Prisma schema * New ResumeDependentParentsService service, use it from checkpoints * WIP on making resuming more robust * Turn the declarative schedules off because they make debugging other runs painful * Resuming batches when there’s an attempt is working * If there’s no attempt then create one * Added a log if there are no span events to complete * If Graphile addJob doesn’t return a row, log and return undefined. No throw * Pass prisma into the ResumeDependentParentsService * Removed the todos * Pass Prisma through to the checkpoint service * Fix for not checking the batch item correctly * Fix for when a log flush times out and the process is checkpointed * Fix for when a log flush times out and the process is checkpointed * Another test run that does batches with failed subtasks * Don’t call ResumeTaskRunDependenciesService anymore (we have a new service) * Only resume if the run is in a final state * If an attempt doesn’t exist, fix for creating queue with sanitized name * If DEV then don’t resume using marqs/batches. The CLI manages it * We don’t need to check the run status again, it’s in the main function now * Added TaskRunAttempt taskRunId index * Only allow calling ResumeDependentParentsService with a run ID * Put the flushing back to what it was
1 parent e5f0aaf commit 392453e

File tree

22 files changed

+624
-231
lines changed

22 files changed

+624
-231
lines changed

.changeset/tidy-pets-smell.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Fix for when a log flush times out and the process is checkpointed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
318318
identifier: K,
319319
payload: z.infer<TMessageCatalog[K]>,
320320
options?: ZodWorkerEnqueueOptions
321-
): Promise<GraphileJob> {
321+
): Promise<GraphileJob | undefined> {
322322
const task = this.#tasks[identifier];
323323

324324
const optionsWithoutTx = removeUndefinedKeys(omit(options ?? {}, ["tx"]));
@@ -439,11 +439,9 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
439439
identifier,
440440
payload,
441441
spec,
442+
error: JSON.stringify(rows.error),
442443
});
443-
444-
throw new Error(
445-
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
446-
);
444+
return { job: undefined, durationInMs: Math.floor(durationInMs) };
447445
}
448446

449447
const job = rows.data[0];

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,12 @@ function RunTimelineLine({ title, state }: RunTimelineLineProps) {
796796
function RunError({ error }: { error: TaskRunError }) {
797797
switch (error.type) {
798798
case "STRING_ERROR":
799+
return (
800+
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
801+
<Header3 className="text-rose-500">Error</Header3>
802+
<Callout variant="error">{error.raw}</Callout>
803+
</div>
804+
);
799805
case "CUSTOM_ERROR": {
800806
return (
801807
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">

apps/webapp/app/services/schedules/deliverScheduledEvent.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ export class DeliverScheduledEventService {
134134
id,
135135
},
136136
data: {
137-
workerJobId: workerJob.id,
137+
workerJobId: workerJob?.id,
138138
nextEventTimestamp: runAt,
139139
},
140140
});

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ export class EventRepository {
226226
const events = await this.queryIncompleteEvents({ spanId });
227227

228228
if (events.length === 0) {
229+
logger.warn("No incomplete events found for spanId", { spanId, options });
229230
return;
230231
}
231232

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,26 +42,10 @@ export class FailedTaskRunService extends BaseService {
4242
id: taskRun.id,
4343
status: "SYSTEM_FAILURE",
4444
completedAt: new Date(),
45+
attemptStatus: "FAILED",
46+
error: sanitizeError(completion.error),
4547
});
4648

47-
// Get the final attempt and add the error to it, if it's not already set
48-
const finalAttempt = await this._prisma.taskRunAttempt.findFirst({
49-
where: {
50-
taskRunId: taskRun.id,
51-
},
52-
orderBy: { id: "desc" },
53-
});
54-
55-
if (finalAttempt && !finalAttempt.error) {
56-
// Haven't set the status because the attempt might still be running
57-
await this._prisma.taskRunAttempt.update({
58-
where: { id: finalAttempt.id },
59-
data: {
60-
error: sanitizeError(completion.error),
61-
},
62-
});
63-
}
64-
6549
// Now we need to "complete" the task run event/span
6650
await eventRepository.completeEvent(taskRun.spanId, {
6751
endTime: new Date(),

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { eventRepository } from "../eventRepository.server";
55
import { isCancellableRunStatus } from "../taskStatus";
66
import { BaseService } from "./baseService.server";
77
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
8-
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
98

109
export class CancelAttemptService extends BaseService {
1110
public async call(
@@ -61,13 +60,15 @@ export class CancelAttemptService extends BaseService {
6160
},
6261
});
6362

63+
const isCancellable = isCancellableRunStatus(taskRunAttempt.taskRun.status);
64+
6465
const finalizeService = new FinalizeTaskRunService(tx);
6566
await finalizeService.call({
6667
id: taskRunId,
67-
status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined,
68-
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
69-
? cancelledAt
70-
: undefined,
68+
status: isCancellable ? "INTERRUPTED" : undefined,
69+
completedAt: isCancellable ? cancelledAt : undefined,
70+
attemptStatus: isCancellable ? "CANCELED" : undefined,
71+
error: isCancellable ? { type: "STRING_ERROR", raw: reason } : undefined,
7172
});
7273
});
7374

@@ -84,10 +85,6 @@ export class CancelAttemptService extends BaseService {
8485
return eventRepository.cancelEvent(event, cancelledAt, reason);
8586
})
8687
);
87-
88-
if (environment?.type !== "DEVELOPMENT") {
89-
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
90-
}
9188
});
9289
}
9390
}

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ export class CancelTaskRunService extends BaseService {
7676
runtimeEnvironment: true,
7777
lockedToVersion: true,
7878
},
79+
attemptStatus: "CANCELED",
80+
error: {
81+
type: "STRING_ERROR",
82+
raw: opts.reason,
83+
},
7984
});
8085

8186
const inProgressEvents = await eventRepository.queryIncompleteEvents({

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import { createExceptionPropertiesFromError, eventRepository } from "../eventRep
1717
import { marqs } from "~/v3/marqs/index.server";
1818
import { BaseService } from "./baseService.server";
1919
import { CancelAttemptService } from "./cancelAttempt.server";
20-
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
2120
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
2221
import { CreateCheckpointService } from "./createCheckpoint.server";
2322
import { TaskRun } from "@trigger.dev/database";
@@ -76,6 +75,12 @@ export class CompleteAttemptService extends BaseService {
7675
id: run.id,
7776
status: "SYSTEM_FAILURE",
7877
completedAt: new Date(),
78+
attemptStatus: "FAILED",
79+
error: {
80+
type: "INTERNAL_ERROR",
81+
code: "TASK_EXECUTION_FAILED",
82+
message: "Tried to complete attempt but it doesn't exist",
83+
},
7984
});
8085

8186
// No attempt, so there's no message to ACK
@@ -149,10 +154,6 @@ export class CompleteAttemptService extends BaseService {
149154
},
150155
});
151156

152-
if (!env || env.type !== "DEVELOPMENT") {
153-
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
154-
}
155-
156157
return "COMPLETED";
157158
}
158159

@@ -355,10 +356,6 @@ export class CompleteAttemptService extends BaseService {
355356
});
356357
}
357358

358-
if (!env || env.type !== "DEVELOPMENT") {
359-
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
360-
}
361-
362359
return "COMPLETED";
363360
}
364361
}

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { marqs } from "~/v3/marqs/index.server";
44
import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
66
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
7-
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
87
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
98
import { sanitizeError } from "@trigger.dev/core/v3";
109
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
@@ -69,6 +68,13 @@ export class CrashTaskRunService extends BaseService {
6968
},
7069
},
7170
},
71+
attemptStatus: "FAILED",
72+
error: {
73+
type: "INTERNAL_ERROR",
74+
code: "TASK_RUN_CRASHED",
75+
message: opts.reason,
76+
stackTrace: opts.logs,
77+
},
7278
});
7379

7480
const inProgressEvents = await eventRepository.queryIncompleteEvents(
@@ -146,12 +152,6 @@ export class CrashTaskRunService extends BaseService {
146152
}),
147153
},
148154
});
149-
150-
if (environment.type === "DEVELOPMENT") {
151-
return;
152-
}
153-
154-
await ResumeTaskRunDependenciesService.enqueue(attempt.id, this._prisma);
155155
});
156156
}
157157
}

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 8 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,11 @@ import type { Checkpoint, CheckpointRestoreEvent } from "@trigger.dev/database";
44
import { logger } from "~/services/logger.server";
55
import { marqs } from "~/v3/marqs/index.server";
66
import { generateFriendlyId } from "../friendlyIdentifiers";
7-
import {
8-
isFinalAttemptStatus,
9-
isFinalRunStatus,
10-
isFreezableAttemptStatus,
11-
isFreezableRunStatus,
12-
} from "../taskStatus";
7+
import { isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus";
138
import { BaseService } from "./baseService.server";
149
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
1510
import { ResumeBatchRunService } from "./resumeBatchRun.server";
16-
import { ResumeTaskDependencyService } from "./resumeTaskDependency.server";
11+
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
1712

1813
export class CreateCheckpointService extends BaseService {
1914
public async call(
@@ -177,127 +172,15 @@ export class CreateCheckpointService extends BaseService {
177172
});
178173
await marqs?.cancelHeartbeat(attempt.taskRunId);
179174

180-
const dependency = await this._prisma.taskRunDependency.findFirst({
181-
select: {
182-
id: true,
183-
taskRunId: true,
184-
},
185-
where: {
186-
taskRun: {
187-
friendlyId: reason.friendlyId,
188-
},
189-
},
190-
});
191-
192-
logger.log("CreateCheckpointService: Created checkpoint WAIT_FOR_TASK", {
193-
checkpointId: checkpoint.id,
194-
runFriendlyId: reason.friendlyId,
195-
dependencyId: dependency?.id,
196-
});
197-
198-
if (!dependency) {
199-
logger.error("CreateCheckpointService: Dependency not found", {
200-
friendlyId: reason.friendlyId,
201-
});
202-
203-
return {
204-
success: true,
205-
checkpoint,
206-
event: checkpointEvent,
207-
keepRunAlive: false,
208-
};
209-
}
210-
211-
const childRun = await this._prisma.taskRun.findFirst({
212-
select: {
213-
id: true,
214-
status: true,
215-
},
216-
where: {
217-
id: dependency.taskRunId,
218-
},
219-
});
220-
221-
if (!childRun) {
222-
logger.error("CreateCheckpointService: Dependency child run not found", {
223-
taskRunId: dependency.taskRunId,
224-
runFriendlyId: reason.friendlyId,
225-
dependencyId: dependency.id,
226-
});
227-
228-
return {
229-
success: true,
230-
checkpoint,
231-
event: checkpointEvent,
232-
keepRunAlive: false,
233-
};
234-
}
175+
const resumeService = new ResumeDependentParentsService(this._prisma);
176+
const result = await resumeService.call({ id: attempt.taskRunId });
235177

236-
const isFinished = isFinalRunStatus(childRun.status);
237-
if (!isFinished) {
238-
logger.debug("CreateCheckpointService: Dependency child run not finished", {
239-
taskRunId: dependency.taskRunId,
240-
runFriendlyId: reason.friendlyId,
241-
dependencyId: dependency.id,
242-
childRunStatus: childRun.status,
243-
childRunId: childRun.id,
244-
});
245-
246-
return {
247-
success: true,
248-
checkpoint,
249-
event: checkpointEvent,
250-
keepRunAlive: false,
251-
};
252-
}
253-
254-
const lastAttempt = await this._prisma.taskRunAttempt.findFirst({
255-
select: {
256-
id: true,
257-
status: true,
258-
},
259-
where: {
260-
taskRunId: dependency.taskRunId,
261-
},
262-
orderBy: {
263-
createdAt: "desc",
264-
},
265-
});
266-
267-
if (!lastAttempt) {
268-
logger.debug("CreateCheckpointService: Dependency child attempt not found", {
269-
taskRunId: dependency.taskRunId,
270-
runFriendlyId: reason.friendlyId,
271-
dependencyId: dependency?.id,
272-
});
273-
return {
274-
success: true,
275-
checkpoint,
276-
event: checkpointEvent,
277-
keepRunAlive: false,
278-
};
279-
}
280-
281-
if (!isFinalAttemptStatus(lastAttempt.status)) {
282-
logger.debug("CreateCheckpointService: Dependency child attempt not final", {
283-
taskRunId: dependency.taskRunId,
284-
runFriendlyId: reason.friendlyId,
285-
dependencyId: dependency.id,
286-
lastAttemptId: lastAttempt.id,
287-
lastAttemptStatus: lastAttempt.status,
288-
});
289-
290-
return {
291-
success: true,
292-
checkpoint,
293-
event: checkpointEvent,
294-
keepRunAlive: false,
295-
};
178+
if (result.success) {
179+
logger.log("CreateCheckpointService: Resumed dependent parents", result);
180+
} else {
181+
logger.error("CreateCheckpointService: Failed to resume dependent parents", result);
296182
}
297183

298-
//resume the dependent task
299-
await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id, this._prisma);
300-
301184
return {
302185
success: true,
303186
checkpoint,

apps/webapp/app/v3/services/expireEnqueuedRun.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ export class ExpireEnqueuedRunService extends BaseService {
4545
status: "EXPIRED",
4646
expiredAt: new Date(),
4747
completedAt: new Date(),
48+
attemptStatus: "FAILED",
49+
error: {
50+
type: "STRING_ERROR",
51+
raw: `Run expired because the TTL (${run.ttl}) was reached`,
52+
},
4853
});
4954

5055
await eventRepository.completeEvent(run.spanId, {

0 commit comments

Comments
 (0)