Skip to content

Commit b35a26a

Browse files
committed
extract validation, idempotency keys, payloads to concerns
1 parent 78d431a commit b35a26a

File tree

5 files changed

+399
-212
lines changed

5 files changed

+399
-212
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
2+
import { TriggerTaskRequest } from "../types";
3+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
4+
import { logger } from "~/services/logger.server";
5+
import { eventRepository } from "~/v3/eventRepository.server";
6+
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
7+
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
8+
import { RunEngine } from "~/v3/runEngine.server";
9+
10+
export type IdempotencyKeyConcernResult =
11+
| { isCached: true; run: TaskRun }
12+
| { isCached: false; idempotencyKey?: string; idempotencyKeyExpiresAt?: Date };
13+
14+
export class IdempotencyKeyConcern {
15+
constructor(
16+
private readonly prisma: PrismaClientOrTransaction,
17+
private readonly engine: RunEngine
18+
) {}
19+
20+
async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
21+
const idempotencyKey = request.options?.idempotencyKey ?? request.body.options?.idempotencyKey;
22+
const idempotencyKeyExpiresAt =
23+
request.options?.idempotencyKeyExpiresAt ??
24+
resolveIdempotencyKeyTTL(request.body.options?.idempotencyKeyTTL) ??
25+
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); // 30 days
26+
27+
if (!idempotencyKey) {
28+
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
29+
}
30+
31+
const existingRun = idempotencyKey
32+
? await this.prisma.taskRun.findFirst({
33+
where: {
34+
runtimeEnvironmentId: request.environment.id,
35+
idempotencyKey,
36+
taskIdentifier: request.taskId,
37+
},
38+
include: {
39+
associatedWaitpoint: true,
40+
},
41+
})
42+
: undefined;
43+
44+
if (existingRun) {
45+
if (existingRun.idempotencyKeyExpiresAt && existingRun.idempotencyKeyExpiresAt < new Date()) {
46+
logger.debug("[TriggerTaskService][call] Idempotency key has expired", {
47+
idempotencyKey: request.options?.idempotencyKey,
48+
run: existingRun,
49+
});
50+
51+
// Update the existing run to remove the idempotency key
52+
await this.prisma.taskRun.update({
53+
where: { id: existingRun.id },
54+
data: { idempotencyKey: null },
55+
});
56+
} else {
57+
//We're using `andWait` so we need to block the parent run with a waitpoint
58+
if (
59+
existingRun.associatedWaitpoint &&
60+
request.body.options?.resumeParentOnCompletion &&
61+
request.body.options?.parentRunId
62+
) {
63+
await eventRepository.traceEvent(
64+
`${request.taskId} (cached)`,
65+
{
66+
context: request.options?.traceContext,
67+
spanParentAsLink: request.options?.spanParentAsLink,
68+
parentAsLinkType: request.options?.parentAsLinkType,
69+
kind: "SERVER",
70+
environment: request.environment,
71+
taskSlug: request.taskId,
72+
attributes: {
73+
properties: {
74+
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
75+
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
76+
},
77+
style: {
78+
icon: "task-cached",
79+
},
80+
runIsTest: request.body.options?.test ?? false,
81+
batchId: request.options?.batchId
82+
? BatchId.toFriendlyId(request.options.batchId)
83+
: undefined,
84+
idempotencyKey,
85+
runId: existingRun.friendlyId,
86+
},
87+
incomplete: existingRun.associatedWaitpoint.status === "PENDING",
88+
isError: existingRun.associatedWaitpoint.outputIsError,
89+
immediate: true,
90+
},
91+
async (event) => {
92+
//log a message
93+
await eventRepository.recordEvent(
94+
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
95+
{
96+
taskSlug: request.taskId,
97+
environment: request.environment,
98+
attributes: {
99+
runId: existingRun.friendlyId,
100+
},
101+
context: request.options?.traceContext,
102+
parentId: event.spanId,
103+
}
104+
);
105+
//block run with waitpoint
106+
await this.engine.blockRunWithWaitpoint({
107+
runId: RunId.fromFriendlyId(request.body.options!.parentRunId!),
108+
waitpoints: existingRun.associatedWaitpoint!.id,
109+
spanIdToComplete: event.spanId,
110+
batch: request.options?.batchId
111+
? {
112+
id: request.options.batchId,
113+
index: request.options.batchIndex ?? 0,
114+
}
115+
: undefined,
116+
projectId: request.environment.projectId,
117+
organizationId: request.environment.organizationId,
118+
tx: this.prisma,
119+
releaseConcurrency: request.body.options?.releaseConcurrency,
120+
});
121+
}
122+
);
123+
}
124+
125+
return { isCached: true, run: existingRun };
126+
}
127+
}
128+
129+
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
130+
}
131+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
2+
import { PayloadProcessor, TriggerTaskRequest } from "../types";
3+
import { env } from "~/env.server";
4+
import { startActiveSpan } from "~/v3/tracer.server";
5+
import { uploadPacketToObjectStore } from "~/v3/r2.server";
6+
import { ServiceValidationError } from "~/v3/services/baseService.server";
7+
8+
export class DefaultPayloadProcessor implements PayloadProcessor {
9+
async process(request: TriggerTaskRequest): Promise<IOPacket> {
10+
return await startActiveSpan("handlePayloadPacket()", async (span) => {
11+
const payload = request.body.payload;
12+
const payloadType = request.body.options?.payloadType ?? "application/json";
13+
14+
const packet = this.#createPayloadPacket(payload, payloadType);
15+
16+
if (!packet.data) {
17+
return packet;
18+
}
19+
20+
const { needsOffloading, size } = packetRequiresOffloading(
21+
packet,
22+
env.TASK_PAYLOAD_OFFLOAD_THRESHOLD
23+
);
24+
25+
span.setAttribute("needsOffloading", needsOffloading);
26+
span.setAttribute("size", size);
27+
28+
if (!needsOffloading) {
29+
return packet;
30+
}
31+
32+
const filename = `${request.friendlyId}/payload.json`;
33+
34+
const [uploadError] = await tryCatch(
35+
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
36+
);
37+
38+
if (uploadError) {
39+
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
40+
}
41+
42+
return {
43+
data: filename,
44+
dataType: "application/store",
45+
};
46+
});
47+
}
48+
49+
#createPayloadPacket(payload: any, payloadType: string): IOPacket {
50+
if (payloadType === "application/json") {
51+
return { data: JSON.stringify(payload), dataType: "application/json" };
52+
}
53+
54+
if (typeof payload === "string") {
55+
return { data: payload, dataType: payloadType };
56+
}
57+
58+
return { dataType: payloadType };
59+
}
60+
}

0 commit comments

Comments
 (0)