Skip to content

Commit 01e8193

Browse files
committed
Reliabily resolve task queue for a run and ack runs where we can't find the queue
1 parent 46e4931 commit 01e8193

File tree

10 files changed

+132
-60
lines changed

10 files changed

+132
-60
lines changed
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { QueueOptions } from "@trigger.dev/core/v3/schemas";
2+
import { TaskQueue } from "@trigger.dev/database";
3+
import { prisma } from "~/db.server";
4+
5+
export async function findQueueInEnvironment(
6+
queueName: string,
7+
environmentId: string,
8+
backgroundWorkerTaskId?: string,
9+
backgroundTask?: { queueConfig?: unknown }
10+
): Promise<TaskQueue | undefined> {
11+
const sanitizedQueueName = sanitizeQueueName(queueName);
12+
13+
const queue = await prisma.taskQueue.findFirst({
14+
where: {
15+
runtimeEnvironmentId: environmentId,
16+
name: sanitizedQueueName,
17+
},
18+
});
19+
20+
if (queue) {
21+
return queue;
22+
}
23+
24+
const task = backgroundTask
25+
? backgroundTask
26+
: backgroundWorkerTaskId
27+
? await prisma.backgroundWorkerTask.findFirst({
28+
where: {
29+
id: backgroundWorkerTaskId,
30+
},
31+
})
32+
: undefined;
33+
34+
if (!task) {
35+
return;
36+
}
37+
38+
const queueConfig = QueueOptions.safeParse(task.queueConfig);
39+
40+
if (queueConfig.success) {
41+
const taskQueueName = queueConfig.data.name
42+
? sanitizeQueueName(queueConfig.data.name)
43+
: undefined;
44+
45+
if (taskQueueName && taskQueueName !== sanitizedQueueName) {
46+
const queue = await prisma.taskQueue.findFirst({
47+
where: {
48+
runtimeEnvironmentId: environmentId,
49+
name: taskQueueName,
50+
},
51+
});
52+
53+
if (queue) {
54+
return queue;
55+
}
56+
}
57+
}
58+
}
59+
60+
// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters)
61+
export function sanitizeQueueName(queueName: string) {
62+
return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
63+
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import { prisma } from "~/db.server";
1313
import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server";
1414
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1515
import { logger } from "~/services/logger.server";
16-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
16+
import { marqs } from "~/v3/marqs/index.server";
1717
import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server";
1818
import { FailedTaskRunService } from "../failedTaskRun.server";
1919
import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server";
2020
import { CompleteAttemptService } from "../services/completeAttempt.server";
2121
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2222
import { getMaxDuration } from "../utils/maxDuration";
2323
import { DevSubscriber, devPubSub } from "./devPubSub.server";
24+
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
2425

2526
const MessageBody = z.discriminatedUnion("type", [
2627
z.object({
@@ -432,14 +433,12 @@ export class DevQueueConsumer {
432433
return;
433434
}
434435

435-
const queue = await prisma.taskQueue.findUnique({
436-
where: {
437-
runtimeEnvironmentId_name: {
438-
runtimeEnvironmentId: this.env.id,
439-
name: sanitizeQueueName(lockedTaskRun.queue),
440-
},
441-
},
442-
});
436+
const queue = await findQueueInEnvironment(
437+
lockedTaskRun.queue,
438+
this.env.id,
439+
backgroundTask.id,
440+
backgroundTask
441+
);
443442

444443
if (!queue) {
445444
logger.debug("[DevQueueConsumer] Failed to find queue", {

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1962,8 +1962,3 @@ function getMarQSClient() {
19621962
}
19631963
}
19641964
}
1965-
1966-
// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters)
1967-
export function sanitizeQueueName(queueName: string) {
1968-
return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
1969-
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
MachinePreset,
1414
ProdTaskRunExecution,
1515
ProdTaskRunExecutionPayload,
16+
QueueOptions,
1617
TaskRunError,
1718
TaskRunErrorCodes,
1819
TaskRunExecution,
@@ -28,6 +29,7 @@ import {
2829
BackgroundWorker,
2930
BackgroundWorkerTask,
3031
Prisma,
32+
TaskQueue,
3133
TaskRunStatus,
3234
} from "@trigger.dev/database";
3335
import { z } from "zod";
@@ -37,7 +39,7 @@ import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
3739
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
3840
import { logger } from "~/services/logger.server";
3941
import { singleton } from "~/utils/singleton";
40-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
42+
import { marqs } from "~/v3/marqs/index.server";
4143
import {
4244
RuntimeEnvironmentForEnvRepo,
4345
RuntimeEnvironmentForEnvRepoPayload,
@@ -65,6 +67,7 @@ import {
6567
import { tracer } from "../tracer.server";
6668
import { getMaxDuration } from "../utils/maxDuration";
6769
import { MessagePayload } from "./types";
70+
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
6871

6972
const WithTraceContext = z.object({
7073
traceparent: z.string().optional(),
@@ -744,12 +747,12 @@ export class SharedQueueConsumer {
744747
};
745748
}
746749

747-
const queue = await prisma.taskQueue.findFirst({
748-
where: {
749-
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
750-
name: sanitizeQueueName(lockedTaskRun.queue),
751-
},
752-
});
750+
const queue = await findQueueInEnvironment(
751+
lockedTaskRun.queue,
752+
lockedTaskRun.runtimeEnvironmentId,
753+
lockedTaskRun.lockedById ?? undefined,
754+
backgroundTask
755+
);
753756

754757
if (!queue) {
755758
logger.debug("SharedQueueConsumer queue not found, so nacking message", {
@@ -759,7 +762,7 @@ export class SharedQueueConsumer {
759762
});
760763

761764
return {
762-
action: "nack_and_do_more_work",
765+
action: "ack_and_do_more_work",
763766
reason: "queue_not_found",
764767
attrs: {
765768
queue_name: sanitizeQueueName(lockedTaskRun.queue),
@@ -1031,12 +1034,11 @@ export class SharedQueueConsumer {
10311034
};
10321035
}
10331036

1034-
const queue = await prisma.taskQueue.findFirst({
1035-
where: {
1036-
runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId,
1037-
name: sanitizeQueueName(resumableRun.queue),
1038-
},
1039-
});
1037+
const queue = await findQueueInEnvironment(
1038+
resumableRun.queue,
1039+
resumableRun.runtimeEnvironmentId,
1040+
resumableRun.lockedById ?? undefined
1041+
);
10401042

10411043
if (!queue) {
10421044
logger.debug("SharedQueueConsumer queue not found, so nacking message", {
@@ -1045,7 +1047,7 @@ export class SharedQueueConsumer {
10451047
});
10461048

10471049
return {
1048-
action: "nack_and_do_more_work",
1050+
action: "ack_and_do_more_work",
10491051
reason: "queue_not_found",
10501052
attrs: {
10511053
queue_name: sanitizeQueueName(resumableRun.queue),

apps/webapp/app/v3/models/workerDeployment.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type WorkerDeploymentWithWorkerTasks = Prisma.WorkerDeploymentGetPayload<{
4444
triggerSource: true;
4545
machineConfig: true;
4646
maxDurationInSeconds: true;
47+
queueConfig: true;
4748
};
4849
};
4950
};

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import type { BackgroundWorker } from "@trigger.dev/database";
77
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
88
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
99
import { logger } from "~/services/logger.server";
10-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
10+
import { marqs } from "~/v3/marqs/index.server";
1111
import { generateFriendlyId } from "../friendlyIdentifiers";
1212
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
1313
import { BaseService } from "./baseService.server";
@@ -16,6 +16,7 @@ import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskSched
1616
import cronstrue from "cronstrue";
1717
import { CheckScheduleService } from "./checkSchedule.server";
1818
import { clampMaxDuration } from "../utils/maxDuration";
19+
import { sanitizeQueueName } from "~/models/taskQueue.server";
1920

2021
export class CreateBackgroundWorkerService extends BaseService {
2122
public async call(

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { machinePresetFromConfig, machinePresetFromRun } from "../machinePresets
1010
import { BaseService, ServiceValidationError } from "./baseService.server";
1111
import { CrashTaskRunService } from "./crashTaskRun.server";
1212
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
13+
import { findQueueInEnvironment } from "~/models/taskQueue.server";
1314

1415
export class CreateTaskRunAttemptService extends BaseService {
1516
public async call({
@@ -95,18 +96,18 @@ export class CreateTaskRunAttemptService extends BaseService {
9596
throw new ServiceValidationError("Task run is cancelled", 400);
9697
}
9798

98-
if (!taskRun.lockedBy) {
99+
const lockedBy = taskRun.lockedBy;
100+
101+
if (!lockedBy) {
99102
throw new ServiceValidationError("Task run is not locked", 400);
100103
}
101104

102-
const queue = await this._prisma.taskQueue.findUnique({
103-
where: {
104-
runtimeEnvironmentId_name: {
105-
runtimeEnvironmentId: environment.id,
106-
name: taskRun.queue,
107-
},
108-
},
109-
});
105+
const queue = await findQueueInEnvironment(
106+
taskRun.queue,
107+
environment.id,
108+
lockedBy.id,
109+
lockedBy
110+
);
110111

111112
if (!queue) {
112113
throw new ServiceValidationError("Queue not found", 404);
@@ -121,7 +122,7 @@ export class CreateTaskRunAttemptService extends BaseService {
121122
if (nextAttemptNumber > MAX_TASK_RUN_ATTEMPTS) {
122123
const service = new CrashTaskRunService(this._prisma);
123124
await service.call(taskRun.id, {
124-
reason: taskRun.lockedBy.worker.supportsLazyAttempts
125+
reason: lockedBy.worker.supportsLazyAttempts
125126
? "Max attempts reached."
126127
: "Max attempts reached. Please upgrade your CLI and SDK.",
127128
});
@@ -136,8 +137,8 @@ export class CreateTaskRunAttemptService extends BaseService {
136137
friendlyId: generateFriendlyId("attempt"),
137138
taskRunId: taskRun.id,
138139
startedAt: new Date(),
139-
backgroundWorkerId: taskRun.lockedBy!.worker.id,
140-
backgroundWorkerTaskId: taskRun.lockedBy!.id,
140+
backgroundWorkerId: lockedBy.worker.id,
141+
backgroundWorkerTaskId: lockedBy.id,
141142
status: setToExecuting ? "EXECUTING" : "PENDING",
142143
queueId: queue.id,
143144
runtimeEnvironmentId: environment.id,
@@ -174,8 +175,7 @@ export class CreateTaskRunAttemptService extends BaseService {
174175
}
175176

176177
const machinePreset =
177-
machinePresetFromRun(taskRun) ??
178-
machinePresetFromConfig(taskRun.lockedBy.machineConfig ?? {});
178+
machinePresetFromRun(taskRun) ?? machinePresetFromConfig(lockedBy.machineConfig ?? {});
179179

180180
const metadata = await parsePacket({
181181
data: taskRun.metadata ?? undefined,
@@ -184,16 +184,16 @@ export class CreateTaskRunAttemptService extends BaseService {
184184

185185
const execution: TaskRunExecution = {
186186
task: {
187-
id: taskRun.lockedBy.slug,
188-
filePath: taskRun.lockedBy.filePath,
189-
exportName: taskRun.lockedBy.exportName,
187+
id: lockedBy.slug,
188+
filePath: lockedBy.filePath,
189+
exportName: lockedBy.exportName,
190190
},
191191
attempt: {
192192
id: taskRunAttempt.friendlyId,
193193
number: taskRunAttempt.number,
194194
startedAt: taskRunAttempt.startedAt ?? taskRunAttempt.createdAt,
195-
backgroundWorkerId: taskRun.lockedBy.worker.id,
196-
backgroundWorkerTaskId: taskRun.lockedBy.id,
195+
backgroundWorkerId: lockedBy.worker.id,
196+
backgroundWorkerTaskId: lockedBy.id,
197197
status: "EXECUTING" as const,
198198
},
199199
run: {
@@ -210,7 +210,7 @@ export class CreateTaskRunAttemptService extends BaseService {
210210
costInCents: taskRun.costInCents,
211211
baseCostInCents: taskRun.baseCostInCents,
212212
maxAttempts: taskRun.maxAttempts ?? undefined,
213-
version: taskRun.lockedBy.worker.version,
213+
version: lockedBy.worker.version,
214214
metadata,
215215
maxDuration: taskRun.maxDurationInSeconds ?? undefined,
216216
},

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { FlushedRunMetadata, sanitizeError, TaskRunError } from "@trigger.dev/core/v3";
22
import { type Prisma, type TaskRun } from "@trigger.dev/database";
33
import { logger } from "~/services/logger.server";
4-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
4+
import { marqs } from "~/v3/marqs/index.server";
55
import { generateFriendlyId } from "../friendlyIdentifiers";
66
import {
77
FINAL_ATTEMPT_STATUSES,
@@ -17,6 +17,7 @@ import { socketIo } from "../handleSocketIo.server";
1717
import { ResumeBatchRunService } from "./resumeBatchRun.server";
1818
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1919
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
20+
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
2021

2122
type BaseInput = {
2223
id: string;
@@ -291,6 +292,7 @@ export class FinalizeTaskRunService extends BaseService {
291292
id: true,
292293
workerId: true,
293294
runtimeEnvironmentId: true,
295+
queueConfig: true,
294296
},
295297
where: {
296298
id: run.lockedById,
@@ -302,14 +304,12 @@ export class FinalizeTaskRunService extends BaseService {
302304
return;
303305
}
304306

305-
const queue = await this._prisma.taskQueue.findUnique({
306-
where: {
307-
runtimeEnvironmentId_name: {
308-
runtimeEnvironmentId: workerTask.runtimeEnvironmentId,
309-
name: sanitizeQueueName(run.queue),
310-
},
311-
},
312-
});
307+
const queue = await findQueueInEnvironment(
308+
run.queue,
309+
workerTask.runtimeEnvironmentId,
310+
workerTask.id,
311+
workerTask
312+
);
313313

314314
if (!queue) {
315315
logger.error("FinalizeTaskRunService: No queue found", { runId: run.id });

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { env } from "~/env.server";
99
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1010
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
1111
import { workerQueue } from "~/services/worker.server";
12-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
12+
import { marqs } from "~/v3/marqs/index.server";
1313
import { eventRepository } from "../eventRepository.server";
1414
import { generateFriendlyId } from "../friendlyIdentifiers";
1515
import { uploadPacketToObjectStore } from "../r2.server";
@@ -27,6 +27,7 @@ import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2727
import { clampMaxDuration } from "../utils/maxDuration";
2828
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
2929
import { Prisma } from "@trigger.dev/database";
30+
import { sanitizeQueueName } from "~/models/taskQueue.server";
3031

3132
export type TriggerTaskServiceOptions = {
3233
idempotencyKey?: string;

references/v3-catalog/src/trigger/queues.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,13 @@ export const queuesTest = task({
3030
await wait.for({ seconds: payload.waitSeconds ?? 1 });
3131
},
3232
});
33+
34+
export const namedQueueTask = task({
35+
id: "queues/named-queue",
36+
queue: {
37+
name: "named-queue",
38+
},
39+
run: async () => {
40+
logger.info("named-queue");
41+
},
42+
});

0 commit comments

Comments
 (0)