Skip to content

Commit d5227bc

Browse files
committed
fix: safe upserting of task queues during worker indexing
1 parent 4d5080d commit d5227bc

File tree

2 files changed

+261
-38
lines changed

2 files changed

+261
-38
lines changed

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -363,43 +363,14 @@ async function createWorkerQueue(
363363
)
364364
: queue.concurrencyLimit;
365365

366-
let taskQueue = await prisma.taskQueue.findFirst({
367-
where: {
368-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
369-
name: queueName,
370-
},
371-
});
372-
373-
if (!taskQueue) {
374-
taskQueue = await prisma.taskQueue.create({
375-
data: {
376-
friendlyId: generateFriendlyId("queue"),
377-
version: "V2",
378-
name: queueName,
379-
orderableName,
380-
concurrencyLimit,
381-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
382-
projectId: worker.projectId,
383-
type: queueType,
384-
workers: {
385-
connect: {
386-
id: worker.id,
387-
},
388-
},
389-
},
390-
});
391-
} else {
392-
await prisma.taskQueue.update({
393-
where: {
394-
id: taskQueue.id,
395-
},
396-
data: {
397-
workers: { connect: { id: worker.id } },
398-
version: "V2",
399-
orderableName,
400-
},
401-
});
402-
}
366+
const taskQueue = await upsertWorkerQueueRecord(
367+
queueName,
368+
concurrencyLimit ?? undefined,
369+
orderableName,
370+
queueType,
371+
worker,
372+
prisma
373+
);
403374

404375
if (typeof concurrencyLimit === "number") {
405376
logger.debug("createWorkerQueue: updating concurrency limit", {
@@ -426,6 +397,75 @@ async function createWorkerQueue(
426397
return taskQueue;
427398
}
428399

400+
async function upsertWorkerQueueRecord(
401+
queueName: string,
402+
concurrencyLimit: number | undefined,
403+
orderableName: string,
404+
queueType: TaskQueueType,
405+
worker: BackgroundWorker,
406+
prisma: PrismaClientOrTransaction,
407+
attempt: number = 0
408+
): Promise<TaskQueue> {
409+
if (attempt > 3) {
410+
throw new Error("Failed to insert queue record");
411+
}
412+
413+
try {
414+
let taskQueue = await prisma.taskQueue.findFirst({
415+
where: {
416+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
417+
name: queueName,
418+
},
419+
});
420+
421+
if (!taskQueue) {
422+
taskQueue = await prisma.taskQueue.create({
423+
data: {
424+
friendlyId: generateFriendlyId("queue"),
425+
version: "V2",
426+
name: queueName,
427+
orderableName,
428+
concurrencyLimit,
429+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
430+
projectId: worker.projectId,
431+
type: queueType,
432+
workers: {
433+
connect: {
434+
id: worker.id,
435+
},
436+
},
437+
},
438+
});
439+
} else {
440+
await prisma.taskQueue.update({
441+
where: {
442+
id: taskQueue.id,
443+
},
444+
data: {
445+
workers: { connect: { id: worker.id } },
446+
version: "V2",
447+
orderableName,
448+
},
449+
});
450+
}
451+
452+
return taskQueue;
453+
} catch (error) {
454+
// If the queue already exists, let's try again
455+
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
456+
return await upsertWorkerQueueRecord(
457+
queueName,
458+
concurrencyLimit,
459+
orderableName,
460+
queueType,
461+
worker,
462+
prisma,
463+
attempt + 1
464+
);
465+
}
466+
throw error;
467+
}
468+
}
429469
//CreateDeclarativeScheduleError with a message
430470
export class CreateDeclarativeScheduleError extends Error {
431471
constructor(message: string) {

references/hello-world/src/trigger/queues.ts

Lines changed: 184 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger, queue, queues, task } from "@trigger.dev/sdk/v3";
1+
import { batch, logger, queue, queues, task } from "@trigger.dev/sdk/v3";
22

33
export const queuesTester = task({
44
id: "queues-tester",
@@ -52,3 +52,186 @@ export const otherQueueTask = task({
5252
logger.log("Other queue task", { payload });
5353
},
5454
});
55+
56+
import { setTimeout } from "node:timers/promises";
57+
58+
type Payload = {
59+
id: string;
60+
waitSeconds: number;
61+
};
62+
63+
export const myQueue = queue({
64+
name: "shared-queue",
65+
concurrencyLimit: 2,
66+
});
67+
68+
// First task type that uses shared queue
69+
export const sharedQueueTask1 = task({
70+
id: "shared-queue-task-1",
71+
queue: myQueue,
72+
run: async (payload: Payload) => {
73+
const startedAt = Date.now();
74+
logger.info(`Task1 ${payload.id} started at ${startedAt}`);
75+
76+
await setTimeout(payload.waitSeconds * 1000);
77+
78+
const completedAt = Date.now();
79+
logger.info(`Task1 ${payload.id} completed at ${completedAt}`);
80+
81+
return {
82+
id: payload.id,
83+
startedAt,
84+
completedAt,
85+
};
86+
},
87+
});
88+
89+
// Second task type that uses the same queue
90+
export const sharedQueueTask2 = task({
91+
id: "shared-queue-task-2",
92+
queue: myQueue,
93+
run: async (payload: Payload) => {
94+
const startedAt = Date.now();
95+
logger.info(`Task2 ${payload.id} started at ${startedAt}`);
96+
97+
await setTimeout(payload.waitSeconds * 1000);
98+
99+
const completedAt = Date.now();
100+
logger.info(`Task2 ${payload.id} completed at ${completedAt}`);
101+
102+
return {
103+
id: payload.id,
104+
startedAt,
105+
completedAt,
106+
};
107+
},
108+
});
109+
110+
export const sharedQueueTask3 = task({
111+
id: "shared-queue-task-3",
112+
queue: myQueue,
113+
run: async (payload: Payload) => {
114+
const startedAt = Date.now();
115+
logger.info(`Task2 ${payload.id} started at ${startedAt}`);
116+
117+
await setTimeout(payload.waitSeconds * 1000);
118+
119+
const completedAt = Date.now();
120+
logger.info(`Task2 ${payload.id} completed at ${completedAt}`);
121+
122+
return {
123+
id: payload.id,
124+
startedAt,
125+
completedAt,
126+
};
127+
},
128+
});
129+
130+
export const sharedQueueTask4 = task({
131+
id: "shared-queue-task-4",
132+
queue: myQueue,
133+
run: async (payload: Payload) => {
134+
const startedAt = Date.now();
135+
logger.info(`Task2 ${payload.id} started at ${startedAt}`);
136+
137+
await setTimeout(payload.waitSeconds * 1000);
138+
139+
const completedAt = Date.now();
140+
logger.info(`Task2 ${payload.id} completed at ${completedAt}`);
141+
142+
return {
143+
id: payload.id,
144+
startedAt,
145+
completedAt,
146+
};
147+
},
148+
});
149+
150+
export const sharedQueueTask5 = task({
151+
id: "shared-queue-task-5",
152+
queue: myQueue,
153+
run: async (payload: Payload) => {
154+
const startedAt = Date.now();
155+
logger.info(`Task2 ${payload.id} started at ${startedAt}`);
156+
157+
await setTimeout(payload.waitSeconds * 1000);
158+
159+
const completedAt = Date.now();
160+
logger.info(`Task2 ${payload.id} completed at ${completedAt}`);
161+
162+
return {
163+
id: payload.id,
164+
startedAt,
165+
completedAt,
166+
};
167+
},
168+
});
169+
170+
// Test task that verifies shared queue concurrency
171+
export const sharedQueueTestTask = task({
172+
id: "shared-queue-test",
173+
retry: {
174+
maxAttempts: 1,
175+
},
176+
// 4 minutes
177+
maxDuration: 240,
178+
run: async (payload, { ctx }) => {
179+
logger.info("Starting shared queue concurrency test");
180+
181+
// Trigger mix of both task types (5 total tasks)
182+
// With concurrencyLimit: 2, we expect only 2 running at once
183+
// regardless of task type
184+
const results = await batch.triggerAndWait([
185+
{ id: sharedQueueTask1.id, payload: { id: "t1-1", waitSeconds: 4 } },
186+
{ id: sharedQueueTask2.id, payload: { id: "t2-1", waitSeconds: 4 } },
187+
{ id: sharedQueueTask1.id, payload: { id: "t1-2", waitSeconds: 4 } },
188+
{ id: sharedQueueTask2.id, payload: { id: "t2-2", waitSeconds: 4 } },
189+
{ id: sharedQueueTask1.id, payload: { id: "t1-3", waitSeconds: 4 } },
190+
]);
191+
192+
// Verify all tasks completed successfully
193+
if (!results.runs.every((r) => r.ok)) {
194+
throw new Error("One or more tasks failed");
195+
}
196+
197+
// Get all executions sorted by start time
198+
const executions = results.runs.map((r) => r.output).sort((a, b) => a.startedAt - b.startedAt);
199+
200+
// For each point in time, count how many tasks were running
201+
let maxConcurrent = 0;
202+
for (let i = 0; i < executions.length; i++) {
203+
const current = executions[i];
204+
const concurrent =
205+
executions.filter(
206+
(task) =>
207+
task.id !== current.id && // not the same task
208+
task.startedAt <= current.startedAt && // started before or at same time
209+
task.completedAt >= current.startedAt // hadn't completed yet
210+
).length + 1; // +1 for current task
211+
212+
maxConcurrent = Math.max(maxConcurrent, concurrent);
213+
}
214+
215+
// Verify we never exceeded the concurrency limit
216+
if (maxConcurrent > 2) {
217+
throw new Error(`Expected maximum of 2 concurrent tasks, but found ${maxConcurrent}`);
218+
}
219+
220+
// Verify tasks from both types were able to run
221+
const task1Runs = executions.filter((e) => e.id.startsWith("t1-")).length;
222+
const task2Runs = executions.filter((e) => e.id.startsWith("t2-")).length;
223+
224+
if (task1Runs === 0 || task2Runs === 0) {
225+
throw new Error(
226+
`Expected both task types to run, but got ${task1Runs} task1 runs and ${task2Runs} task2 runs`
227+
);
228+
}
229+
230+
return {
231+
executions,
232+
maxConcurrent,
233+
task1Count: task1Runs,
234+
task2Count: task2Runs,
235+
};
236+
},
237+
});

0 commit comments

Comments
 (0)