Skip to content

feat: v4 deadlock detection #1970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tender-jobs-collect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

TriggerApiError 4xx errors will no longer cause tasks to be retried
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
{
"type": "node-terminal",
"request": "launch",
"name": "Debug fairDequeuingStrategy.test.ts",
"command": "pnpm run test -t FairDequeuingStrategy",
"name": "Debug triggerTask.test.ts",
"command": "pnpm run test --run ./test/engine/triggerTask.test.ts",
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}/apps/webapp",
"sourceMaps": true
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/components/runs/v3/SpanEvents.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ export function SpanEventError({
time={spanEvent.time}
titleClassName="text-rose-500"
/>
{enhancedException.message && <Callout variant="error">{enhancedException.message}</Callout>}
{enhancedException.message && (
<Callout variant="error">
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
{enhancedException.message}
</pre>
</Callout>
)}
{enhancedException.link &&
(enhancedException.link.magic === "CONTACT_FORM" ? (
<Feedback
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
import { TaskRun } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
Expand Down Expand Up @@ -116,7 +117,9 @@ const { action, loader } = createActionApiRoute(
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
return json({ error: error.message }, { status: error.status ?? 422 });
} else if (error instanceof EngineServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,13 @@ function RunError({ error }: { error: TaskRunError }) {
return (
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
<Header3 className="text-rose-500">{name}</Header3>
{enhancedError.message && <Callout variant="error">{enhancedError.message}</Callout>}
{enhancedError.message && (
<Callout variant="error">
<pre className="text-wrap font-sans text-sm font-normal text-rose-200">
{enhancedError.message}
</pre>
</Callout>
)}
{enhancedError.link &&
(enhancedError.link.magic === "CONTACT_FORM" ? (
<Feedback
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/runEngine/concerns/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export class EngineServiceValidationError extends Error {
constructor(message: string, public status?: number) {
super(message);
this.name = "EngineServiceValidationError";
}
}
96 changes: 96 additions & 0 deletions apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import type { RunEngine } from "~/v3/runEngine.server";
import type { TraceEventConcern, TriggerTaskRequest } from "../types";

export type IdempotencyKeyConcernResult =
| { isCached: true; run: TaskRun }
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };

export class IdempotencyKeyConcern {
constructor(
private readonly prisma: PrismaClientOrTransaction,
private readonly engine: RunEngine,
private readonly traceEventConcern: TraceEventConcern
) {}

async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
const idempotencyKeyExpiresAt =
request.options?.idempotencyKeyExpiresAt ??
resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days

if (!idempotencyKey) {
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}

const existingRun = idempotencyKey
? await this.prisma.taskRun.findFirst({
where: {
runtimeEnvironmentId: request.environment.id,
idempotencyKey,
taskIdentifier: request.taskId,
},
include: {
associatedWaitpoint: true,
},
})
: undefined;

if (existingRun) {
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
idempotencyKey: request.options?.idempotencyKey,
run: existingRun,
});

// Update the existing run to remove the idempotency key
await this.prisma.taskRun.updateMany({
where: { id: existingRun.id, idempotencyKey },
data: { idempotencyKey: null, idempotencyKeyExpiresAt: null },
});
} else {
const associatedWaitpoint = existingRun.associatedWaitpoint;
const parentRunId = request.body.options?.parentRunId;
const resumeParentOnCompletion = request.body.options?.resumeParentOnCompletion;
//We're using `andWait` so we need to block the parent run with a waitpoint
if (associatedWaitpoint && resumeParentOnCompletion && parentRunId) {
await this.traceEventConcern.traceIdempotentRun(
request,
{
existingRun,
idempotencyKey,
incomplete: associatedWaitpoint.status === "PENDING",
isError: associatedWaitpoint.outputIsError,
},
async (event) => {
//block run with waitpoint
await this.engine.blockRunWithWaitpoint({
runId: RunId.fromFriendlyId(parentRunId),
waitpoints: associatedWaitpoint.id,
spanIdToComplete: event.spanId,
batch: request.options?.batchId
? {
id: request.options.batchId,
index: request.options.batchIndex ?? 0,
}
: undefined,
projectId: request.environment.projectId,
organizationId: request.environment.organizationId,
tx: this.prisma,
releaseConcurrency: request.body.options?.releaseConcurrency,
});
}
);
}

return { isCached: true, run: existingRun };
}
}

return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
}
}
63 changes: 63 additions & 0 deletions apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { EngineServiceValidationError } from "./errors";

export class DefaultPayloadProcessor implements PayloadProcessor {
async process(request: TriggerTaskRequest): Promise<IOPacket> {
return await startActiveSpan("handlePayloadPacket()", async (span) => {
const payload = request.body.payload;
const payloadType = request.body.options?.payloadType ?? "application/json";

const packet = this.#createPayloadPacket(payload, payloadType);

if (!packet.data) {
return packet;
}

const { needsOffloading, size } = packetRequiresOffloading(
packet,
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
);

span.setAttribute("needsOffloading", needsOffloading);
span.setAttribute("size", size);

if (!needsOffloading) {
return packet;
}

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
);

if (uploadError) {
throw new EngineServiceValidationError(
"Failed to upload large payload to object store",
500
); // This is retryable
}

return {
data: filename,
dataType: "application/store",
};
});
}

#createPayloadPacket(payload: any, payloadType: string): IOPacket {
if (payloadType === "application/json") {
return { data: JSON.stringify(payload), dataType: "application/json" };
}

if (typeof payload === "string") {
return { data: payload, dataType: payloadType };
}

return { dataType: payloadType };
}
}
Loading