Skip to content

Frozen run fixes #1286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cf035a5
When resuming a batch, only do marqs operations once
matt-aitken Sep 6, 2024
1648642
Made TaskRunDependency clearer in the Prisma schema
matt-aitken Sep 6, 2024
9812cbc
New ResumeDependentParentsService service, use it from checkpoints
matt-aitken Sep 6, 2024
8d99466
WIP on making resuming more robust
matt-aitken Sep 6, 2024
002fd05
Turn the declarative schedules off because they make debugging other …
matt-aitken Sep 6, 2024
0cd686d
Resuming batches when there’s an attempt is working
matt-aitken Sep 6, 2024
09616ee
If there’s no attempt then create one
matt-aitken Sep 6, 2024
2662842
Added a log if there are no span events to complete
matt-aitken Sep 6, 2024
8ccbafc
If Graphile addJob doesn’t return a row, log and return undefined. No…
matt-aitken Sep 6, 2024
82a8da0
Pass prisma into the ResumeDependentParentsService
matt-aitken Sep 6, 2024
51d4218
Removed the todos
matt-aitken Sep 6, 2024
a4abf15
Pass Prisma through to the checkpoint service
matt-aitken Sep 6, 2024
84e3311
Fix for not checking the batch item correctly
matt-aitken Sep 9, 2024
5f7e645
Fix for when a log flush times out and the process is checkpointed
matt-aitken Sep 9, 2024
5f58ba0
Fix for when a log flush times out and the process is checkpointed
matt-aitken Sep 9, 2024
f206832
Another test run that does batches with failed subtasks
matt-aitken Sep 9, 2024
dece0cb
Don’t call ResumeTaskRunDependenciesService anymore (we have a new se…
matt-aitken Sep 9, 2024
9c99b99
Only resume if the run is in a final state
matt-aitken Sep 9, 2024
929c53e
If an attempt doesn’t exist, fix for creating queue with sanitized name
matt-aitken Sep 9, 2024
6d24001
If DEV then don’t resume using marqs/batches. The CLI manages it
matt-aitken Sep 9, 2024
8b4025d
We don’t need to check the run status again, it’s in the main functio…
matt-aitken Sep 9, 2024
5fd1462
Added TaskRunAttempt taskRunId index
matt-aitken Sep 9, 2024
12671fc
Only allow calling ResumeDependentParentsService with a run ID
matt-aitken Sep 9, 2024
9303d6a
Merge branch 'main' into frozen-runs
matt-aitken Sep 9, 2024
bc958d1
Put the flushing back to what it was
matt-aitken Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/tidy-pets-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fix for when a log flush times out and the process is checkpointed
8 changes: 3 additions & 5 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
identifier: K,
payload: z.infer<TMessageCatalog[K]>,
options?: ZodWorkerEnqueueOptions
): Promise<GraphileJob> {
): Promise<GraphileJob | undefined> {
const task = this.#tasks[identifier];

const optionsWithoutTx = removeUndefinedKeys(omit(options ?? {}, ["tx"]));
Expand Down Expand Up @@ -439,11 +439,9 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
identifier,
payload,
spec,
error: JSON.stringify(rows.error),
});

throw new Error(
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
);
return { job: undefined, durationInMs: Math.floor(durationInMs) };
}

const job = rows.data[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,12 @@ function RunTimelineLine({ title, state }: RunTimelineLineProps) {
function RunError({ error }: { error: TaskRunError }) {
switch (error.type) {
case "STRING_ERROR":
return (
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
<Header3 className="text-rose-500">Error</Header3>
<Callout variant="error">{error.raw}</Callout>
</div>
);
case "CUSTOM_ERROR": {
return (
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export class DeliverScheduledEventService {
id,
},
data: {
workerJobId: workerJob.id,
workerJobId: workerJob?.id,
nextEventTimestamp: runAt,
},
});
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ export class EventRepository {
const events = await this.queryIncompleteEvents({ spanId });

if (events.length === 0) {
logger.warn("No incomplete events found for spanId", { spanId, options });
return;
}

Expand Down
20 changes: 2 additions & 18 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,10 @@ export class FailedTaskRunService extends BaseService {
id: taskRun.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
attemptStatus: "FAILED",
error: sanitizeError(completion.error),
});

// Get the final attempt and add the error to it, if it's not already set
const finalAttempt = await this._prisma.taskRunAttempt.findFirst({
where: {
taskRunId: taskRun.id,
},
orderBy: { id: "desc" },
});

if (finalAttempt && !finalAttempt.error) {
// Haven't set the status because the attempt might still be running
await this._prisma.taskRunAttempt.update({
where: { id: finalAttempt.id },
data: {
error: sanitizeError(completion.error),
},
});
}

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRun.spanId, {
endTime: new Date(),
Expand Down
15 changes: 6 additions & 9 deletions apps/webapp/app/v3/services/cancelAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { eventRepository } from "../eventRepository.server";
import { isCancellableRunStatus } from "../taskStatus";
import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";

export class CancelAttemptService extends BaseService {
public async call(
Expand Down Expand Up @@ -61,13 +60,15 @@ export class CancelAttemptService extends BaseService {
},
});

const isCancellable = isCancellableRunStatus(taskRunAttempt.taskRun.status);

const finalizeService = new FinalizeTaskRunService(tx);
await finalizeService.call({
id: taskRunId,
status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined,
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
? cancelledAt
: undefined,
status: isCancellable ? "INTERRUPTED" : undefined,
completedAt: isCancellable ? cancelledAt : undefined,
attemptStatus: isCancellable ? "CANCELED" : undefined,
error: isCancellable ? { type: "STRING_ERROR", raw: reason } : undefined,
});
});

Expand All @@ -84,10 +85,6 @@ export class CancelAttemptService extends BaseService {
return eventRepository.cancelEvent(event, cancelledAt, reason);
})
);

if (environment?.type !== "DEVELOPMENT") {
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
}
});
}
}
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ export class CancelTaskRunService extends BaseService {
runtimeEnvironment: true,
lockedToVersion: true,
},
attemptStatus: "CANCELED",
error: {
type: "STRING_ERROR",
raw: opts.reason,
},
});

const inProgressEvents = await eventRepository.queryIncompleteEvents({
Expand Down
15 changes: 6 additions & 9 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { createExceptionPropertiesFromError, eventRepository } from "../eventRep
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { CancelAttemptService } from "./cancelAttempt.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
import { CreateCheckpointService } from "./createCheckpoint.server";
import { TaskRun } from "@trigger.dev/database";
Expand Down Expand Up @@ -76,6 +75,12 @@ export class CompleteAttemptService extends BaseService {
id: run.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
attemptStatus: "FAILED",
error: {
type: "INTERNAL_ERROR",
code: "TASK_EXECUTION_FAILED",
message: "Tried to complete attempt but it doesn't exist",
},
});

// No attempt, so there's no message to ACK
Expand Down Expand Up @@ -149,10 +154,6 @@ export class CompleteAttemptService extends BaseService {
},
});

if (!env || env.type !== "DEVELOPMENT") {
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
}

return "COMPLETED";
}

Expand Down Expand Up @@ -355,10 +356,6 @@ export class CompleteAttemptService extends BaseService {
});
}

if (!env || env.type !== "DEVELOPMENT") {
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
}

return "COMPLETED";
}
}
Expand Down
14 changes: 7 additions & 7 deletions apps/webapp/app/v3/services/crashTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
import { sanitizeError } from "@trigger.dev/core/v3";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
Expand Down Expand Up @@ -69,6 +68,13 @@ export class CrashTaskRunService extends BaseService {
},
},
},
attemptStatus: "FAILED",
error: {
type: "INTERNAL_ERROR",
code: "TASK_RUN_CRASHED",
message: opts.reason,
stackTrace: opts.logs,
},
});

const inProgressEvents = await eventRepository.queryIncompleteEvents(
Expand Down Expand Up @@ -146,12 +152,6 @@ export class CrashTaskRunService extends BaseService {
}),
},
});

if (environment.type === "DEVELOPMENT") {
return;
}

await ResumeTaskRunDependenciesService.enqueue(attempt.id, this._prisma);
});
}
}
133 changes: 8 additions & 125 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ import type { Checkpoint, CheckpointRestoreEvent } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { generateFriendlyId } from "../friendlyIdentifiers";
import {
isFinalAttemptStatus,
isFinalRunStatus,
isFreezableAttemptStatus,
isFreezableRunStatus,
} from "../taskStatus";
import { isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus";
import { BaseService } from "./baseService.server";
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
import { ResumeBatchRunService } from "./resumeBatchRun.server";
import { ResumeTaskDependencyService } from "./resumeTaskDependency.server";
import { ResumeDependentParentsService } from "./resumeDependentParents.server";

export class CreateCheckpointService extends BaseService {
public async call(
Expand Down Expand Up @@ -177,127 +172,15 @@ export class CreateCheckpointService extends BaseService {
});
await marqs?.cancelHeartbeat(attempt.taskRunId);

const dependency = await this._prisma.taskRunDependency.findFirst({
select: {
id: true,
taskRunId: true,
},
where: {
taskRun: {
friendlyId: reason.friendlyId,
},
},
});

logger.log("CreateCheckpointService: Created checkpoint WAIT_FOR_TASK", {
checkpointId: checkpoint.id,
runFriendlyId: reason.friendlyId,
dependencyId: dependency?.id,
});

if (!dependency) {
logger.error("CreateCheckpointService: Dependency not found", {
friendlyId: reason.friendlyId,
});

return {
success: true,
checkpoint,
event: checkpointEvent,
keepRunAlive: false,
};
}

const childRun = await this._prisma.taskRun.findFirst({
select: {
id: true,
status: true,
},
where: {
id: dependency.taskRunId,
},
});

if (!childRun) {
logger.error("CreateCheckpointService: Dependency child run not found", {
taskRunId: dependency.taskRunId,
runFriendlyId: reason.friendlyId,
dependencyId: dependency.id,
});

return {
success: true,
checkpoint,
event: checkpointEvent,
keepRunAlive: false,
};
}
const resumeService = new ResumeDependentParentsService(this._prisma);
const result = await resumeService.call({ id: attempt.taskRunId });

const isFinished = isFinalRunStatus(childRun.status);
if (!isFinished) {
logger.debug("CreateCheckpointService: Dependency child run not finished", {
taskRunId: dependency.taskRunId,
runFriendlyId: reason.friendlyId,
dependencyId: dependency.id,
childRunStatus: childRun.status,
childRunId: childRun.id,
});

return {
success: true,
checkpoint,
event: checkpointEvent,
keepRunAlive: false,
};
}

const lastAttempt = await this._prisma.taskRunAttempt.findFirst({
select: {
id: true,
status: true,
},
where: {
taskRunId: dependency.taskRunId,
},
orderBy: {
createdAt: "desc",
},
});

if (!lastAttempt) {
logger.debug("CreateCheckpointService: Dependency child attempt not found", {
taskRunId: dependency.taskRunId,
runFriendlyId: reason.friendlyId,
dependencyId: dependency?.id,
});
return {
success: true,
checkpoint,
event: checkpointEvent,
keepRunAlive: false,
};
}

if (!isFinalAttemptStatus(lastAttempt.status)) {
logger.debug("CreateCheckpointService: Dependency child attempt not final", {
taskRunId: dependency.taskRunId,
runFriendlyId: reason.friendlyId,
dependencyId: dependency.id,
lastAttemptId: lastAttempt.id,
lastAttemptStatus: lastAttempt.status,
});

return {
success: true,
checkpoint,
event: checkpointEvent,
keepRunAlive: false,
};
if (result.success) {
logger.log("CreateCheckpointService: Resumed dependent parents", result);
} else {
logger.error("CreateCheckpointService: Failed to resume dependent parents", result);
}

//resume the dependent task
await ResumeTaskDependencyService.enqueue(dependency.id, lastAttempt.id, this._prisma);

return {
success: true,
checkpoint,
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/v3/services/expireEnqueuedRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ export class ExpireEnqueuedRunService extends BaseService {
status: "EXPIRED",
expiredAt: new Date(),
completedAt: new Date(),
attemptStatus: "FAILED",
error: {
type: "STRING_ERROR",
raw: `Run expired because the TTL (${run.ttl}) was reached`,
},
});

await eventRepository.completeEvent(run.spanId, {
Expand Down
Loading
Loading