Skip to content

Commit 2b1d9d9

Browse files
committed
Extracted out a bunch of more stuff and getting trigger tests to work
1 parent b35a26a commit 2b1d9d9

File tree

18 files changed

+647
-302
lines changed

18 files changed

+647
-302
lines changed

.vscode/launch.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
{
2626
"type": "node-terminal",
2727
"request": "launch",
28-
"name": "Debug fairDequeuingStrategy.test.ts",
29-
"command": "pnpm run test -t FairDequeuingStrategy",
28+
"name": "Debug triggerTask.test.ts",
29+
"command": "pnpm run test --run ./test/engine/triggerTask.test.ts",
3030
"envFile": "${workspaceFolder}/.env",
3131
"cwd": "${workspaceFolder}/apps/webapp",
3232
"sourceMaps": true

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { TaskRun } from "@trigger.dev/database";
88
import { z } from "zod";
99
import { env } from "~/env.server";
10+
import { EngineServiceValidationError } from "~/runEngine/concerns/errors";
1011
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
1112
import { logger } from "~/services/logger.server";
1213
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
@@ -117,6 +118,8 @@ const { action, loader } = createActionApiRoute(
117118
} catch (error) {
118119
if (error instanceof ServiceValidationError) {
119120
return json({ error: error.message }, { status: error.status ?? 422 });
121+
} else if (error instanceof EngineServiceValidationError) {
122+
return json({ error: error.message }, { status: error.status ?? 422 });
120123
} else if (error instanceof OutOfEntitlementError) {
121124
return json({ error: error.message }, { status: 422 });
122125
} else if (error instanceof Error) {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class EngineServiceValidationError extends Error {
2+
constructor(message: string, public status?: number) {
3+
super(message);
4+
this.name = "EngineServiceValidationError";
5+
}
6+
}

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
2-
import { TriggerTaskRequest } from "../types";
3-
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
2+
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
43
import { logger } from "~/services/logger.server";
5-
import { eventRepository } from "~/v3/eventRepository.server";
6-
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
7-
import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
8-
import { RunEngine } from "~/v3/runEngine.server";
4+
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
5+
import type { RunEngine } from "~/v3/runEngine.server";
6+
import type { TraceEventConcern, TriggerTaskRequest } from "../types";
97

108
export type IdempotencyKeyConcernResult =
119
| { isCached: true; run: TaskRun }
@@ -14,7 +12,8 @@ export type IdempotencyKeyConcernResult =
1412
export class IdempotencyKeyConcern {
1513
constructor(
1614
private readonly prisma: PrismaClientOrTransaction,
17-
private readonly engine: RunEngine
15+
private readonly engine: RunEngine,
16+
private readonly traceEventConcern: TraceEventConcern
1817
) {}
1918

2019
async handleTriggerRequest(request: TriggerTaskRequest): Promise<IdempotencyKeyConcernResult> {
@@ -60,48 +59,15 @@ export class IdempotencyKeyConcern {
6059
request.body.options?.resumeParentOnCompletion &&
6160
request.body.options?.parentRunId
6261
) {
63-
await eventRepository.traceEvent(
64-
`${request.taskId} (cached)`,
62+
await this.traceEventConcern.traceIdempotentRun(
63+
request,
6564
{
66-
context: request.options?.traceContext,
67-
spanParentAsLink: request.options?.spanParentAsLink,
68-
parentAsLinkType: request.options?.parentAsLinkType,
69-
kind: "SERVER",
70-
environment: request.environment,
71-
taskSlug: request.taskId,
72-
attributes: {
73-
properties: {
74-
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
75-
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
76-
},
77-
style: {
78-
icon: "task-cached",
79-
},
80-
runIsTest: request.body.options?.test ?? false,
81-
batchId: request.options?.batchId
82-
? BatchId.toFriendlyId(request.options.batchId)
83-
: undefined,
84-
idempotencyKey,
85-
runId: existingRun.friendlyId,
86-
},
65+
existingRun,
66+
idempotencyKey,
8767
incomplete: existingRun.associatedWaitpoint.status === "PENDING",
8868
isError: existingRun.associatedWaitpoint.outputIsError,
89-
immediate: true,
9069
},
9170
async (event) => {
92-
//log a message
93-
await eventRepository.recordEvent(
94-
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
95-
{
96-
taskSlug: request.taskId,
97-
environment: request.environment,
98-
attributes: {
99-
runId: existingRun.friendlyId,
100-
},
101-
context: request.options?.traceContext,
102-
parentId: event.spanId,
103-
}
104-
);
10571
//block run with waitpoint
10672
await this.engine.blockRunWithWaitpoint({
10773
runId: RunId.fromFriendlyId(request.body.options!.parentRunId!),

apps/webapp/app/runEngine/concerns/payloads.server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { PayloadProcessor, TriggerTaskRequest } from "../types";
33
import { env } from "~/env.server";
44
import { startActiveSpan } from "~/v3/tracer.server";
55
import { uploadPacketToObjectStore } from "~/v3/r2.server";
6-
import { ServiceValidationError } from "~/v3/services/baseService.server";
6+
import { EngineServiceValidationError } from "./errors";
77

88
export class DefaultPayloadProcessor implements PayloadProcessor {
99
async process(request: TriggerTaskRequest): Promise<IOPacket> {
@@ -36,7 +36,10 @@ export class DefaultPayloadProcessor implements PayloadProcessor {
3636
);
3737

3838
if (uploadError) {
39-
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
39+
throw new EngineServiceValidationError(
40+
"Failed to upload large payload to object store",
41+
500
42+
); // This is retryable
4043
}
4144

4245
return {

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { PrismaClientOrTransaction } from "@trigger.dev/database";
33
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { logger } from "~/services/logger.server";
55
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";
6-
import { ServiceValidationError } from "~/v3/services/baseService.server";
76
import {
87
LockedBackgroundWorker,
98
QueueManager,
@@ -14,6 +13,7 @@ import {
1413
import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server";
1514
import type { RunEngine } from "~/v3/runEngine.server";
1615
import { env } from "~/env.server";
16+
import { EngineServiceValidationError } from "./errors";
1717

1818
export class DefaultQueueManager implements QueueManager {
1919
constructor(
@@ -43,7 +43,7 @@ export class DefaultQueueManager implements QueueManager {
4343
});
4444

4545
if (!specifiedQueue) {
46-
throw new ServiceValidationError(
46+
throw new EngineServiceValidationError(
4747
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${
4848
lockedBackgroundWorker.version ?? "<unknown>"
4949
}'.`
@@ -65,7 +65,7 @@ export class DefaultQueueManager implements QueueManager {
6565
});
6666

6767
if (!lockedTask) {
68-
throw new ServiceValidationError(
68+
throw new EngineServiceValidationError(
6969
`Task '${request.taskId}' not found on locked version '${
7070
lockedBackgroundWorker.version ?? "<unknown>"
7171
}'.`
@@ -80,7 +80,7 @@ export class DefaultQueueManager implements QueueManager {
8080
workerId: lockedBackgroundWorker.id,
8181
version: lockedBackgroundWorker.version,
8282
});
83-
throw new ServiceValidationError(
83+
throw new EngineServiceValidationError(
8484
`Default queue configuration for task '${request.taskId}' missing on locked version '${
8585
lockedBackgroundWorker.version ?? "<unknown>"
8686
}'.`
@@ -94,7 +94,7 @@ export class DefaultQueueManager implements QueueManager {
9494
// Task is not locked to a specific version, use regular logic
9595
if (request.body.options?.lockToVersion) {
9696
// This should only happen if the findFirst failed, indicating the version doesn't exist
97-
throw new ServiceValidationError(
97+
throw new EngineServiceValidationError(
9898
`Task locked to version '${request.body.options.lockToVersion}', but no worker found with that version.`
9999
);
100100
}
@@ -130,7 +130,7 @@ export class DefaultQueueManager implements QueueManager {
130130
const defaultQueueName = `task/${taskId}`;
131131

132132
// Find the current worker for the environment
133-
const worker = await findCurrentWorkerFromEnvironment(environment);
133+
const worker = await findCurrentWorkerFromEnvironment(environment, this.prisma);
134134

135135
if (!worker) {
136136
logger.debug("Failed to get queue name: No worker found", {
@@ -208,7 +208,7 @@ export class DefaultQueueManager implements QueueManager {
208208
});
209209

210210
if (!workerGroup) {
211-
throw new ServiceValidationError("No worker group found");
211+
throw new EngineServiceValidationError("No worker group found");
212212
}
213213

214214
return workerGroup.masterQueue;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
2+
import { RunNumberIncrementer, TriggerTaskRequest } from "../types";
3+
4+
export class DefaultRunNumberIncrementer implements RunNumberIncrementer {
5+
async incrementRunNumber<T>(
6+
request: TriggerTaskRequest,
7+
callback: (num: number) => Promise<T>
8+
): Promise<T | undefined> {
9+
return await autoIncrementCounter.incrementInTransaction(
10+
`v3-run:${request.environment.id}:${request.taskId}`,
11+
callback
12+
);
13+
}
14+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { EventRepository } from "~/v3/eventRepository.server";
2+
import { TracedEventSpan, TraceEventConcern, TriggerTaskRequest } from "../types";
3+
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
4+
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
5+
import { TaskRun } from "@trigger.dev/database";
6+
7+
export class DefaultTraceEventsConcern implements TraceEventConcern {
8+
private readonly eventRepository: EventRepository;
9+
10+
constructor(eventRepository: EventRepository) {
11+
this.eventRepository = eventRepository;
12+
}
13+
14+
async traceRun<T>(
15+
request: TriggerTaskRequest,
16+
callback: (span: TracedEventSpan) => Promise<T>
17+
): Promise<T> {
18+
return await this.eventRepository.traceEvent(
19+
request.taskId,
20+
{
21+
context: request.options?.traceContext,
22+
spanParentAsLink: request.options?.spanParentAsLink,
23+
parentAsLinkType: request.options?.parentAsLinkType,
24+
kind: "SERVER",
25+
environment: request.environment,
26+
taskSlug: request.taskId,
27+
attributes: {
28+
properties: {
29+
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
30+
},
31+
style: {
32+
icon: request.options?.customIcon ?? "task",
33+
},
34+
runIsTest: request.body.options?.test ?? false,
35+
batchId: request.options?.batchId
36+
? BatchId.toFriendlyId(request.options.batchId)
37+
: undefined,
38+
idempotencyKey: request.options?.idempotencyKey,
39+
},
40+
incomplete: true,
41+
immediate: true,
42+
},
43+
async (event, traceContext, traceparent) => {
44+
return await callback({
45+
traceId: event.traceId,
46+
spanId: event.spanId,
47+
traceContext,
48+
traceparent,
49+
setAttribute: (key, value) => event.setAttribute(key as any, value),
50+
failWithError: event.failWithError.bind(event),
51+
});
52+
}
53+
);
54+
}
55+
56+
async traceIdempotentRun<T>(
57+
request: TriggerTaskRequest,
58+
options: {
59+
existingRun: TaskRun;
60+
idempotencyKey: string;
61+
incomplete: boolean;
62+
isError: boolean;
63+
},
64+
callback: (span: TracedEventSpan) => Promise<T>
65+
): Promise<T> {
66+
const { existingRun, idempotencyKey, incomplete, isError } = options;
67+
68+
return await this.eventRepository.traceEvent(
69+
`${request.taskId} (cached)`,
70+
{
71+
context: request.options?.traceContext,
72+
spanParentAsLink: request.options?.spanParentAsLink,
73+
parentAsLinkType: request.options?.parentAsLinkType,
74+
kind: "SERVER",
75+
environment: request.environment,
76+
taskSlug: request.taskId,
77+
attributes: {
78+
properties: {
79+
[SemanticInternalAttributes.SHOW_ACTIONS]: true,
80+
[SemanticInternalAttributes.ORIGINAL_RUN_ID]: existingRun.friendlyId,
81+
},
82+
style: {
83+
icon: "task-cached",
84+
},
85+
runIsTest: request.body.options?.test ?? false,
86+
batchId: request.options?.batchId
87+
? BatchId.toFriendlyId(request.options.batchId)
88+
: undefined,
89+
idempotencyKey,
90+
runId: existingRun.friendlyId,
91+
},
92+
incomplete,
93+
isError,
94+
immediate: true,
95+
},
96+
async (event, traceContext, traceparent) => {
97+
//log a message
98+
await this.eventRepository.recordEvent(
99+
`There's an existing run for idempotencyKey: ${idempotencyKey}`,
100+
{
101+
taskSlug: request.taskId,
102+
environment: request.environment,
103+
attributes: {
104+
runId: existingRun.friendlyId,
105+
},
106+
context: request.options?.traceContext,
107+
parentId: event.spanId,
108+
}
109+
);
110+
111+
return await callback({
112+
traceId: event.traceId,
113+
spanId: event.spanId,
114+
traceContext,
115+
traceparent,
116+
setAttribute: (key, value) => event.setAttribute(key as any, value),
117+
failWithError: event.failWithError.bind(event),
118+
});
119+
}
120+
);
121+
}
122+
}

0 commit comments

Comments
 (0)