Skip to content

Commit 6864f5d

Browse files
committed
Locked task runs will now require queues and tasks to be in the locked version
1 parent 3a62ee3 commit 6864f5d

File tree

5 files changed

+154
-41
lines changed

5 files changed

+154
-41
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 120 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,123 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
251251
);
252252
}
253253

254+
const lockedToBackgroundWorker = body.options?.lockToVersion
255+
? await this._prisma.backgroundWorker.findFirst({
256+
where: {
257+
projectId: environment.projectId,
258+
runtimeEnvironmentId: environment.id,
259+
version: body.options?.lockToVersion,
260+
},
261+
select: {
262+
id: true,
263+
version: true,
264+
sdkVersion: true,
265+
cliVersion: true,
266+
},
267+
})
268+
: undefined;
269+
270+
let queueName: string;
271+
let lockedQueueId: string | undefined;
272+
273+
// Determine queue name based on lockToVersion and provided options
274+
if (lockedToBackgroundWorker) {
275+
// Task is locked to a specific worker version
276+
if (body.options?.queue?.name) {
277+
const specifiedQueueName = body.options.queue.name;
278+
// A specific queue name is provided
279+
const specifiedQueue = await this._prisma.taskQueue.findFirst({
280+
// Validate it exists for the locked worker
281+
where: {
282+
name: specifiedQueueName,
283+
workers: { some: { id: lockedToBackgroundWorker.id } }, // Ensure the queue is associated with any task of the locked worker
284+
},
285+
});
286+
287+
if (!specifiedQueue) {
288+
throw new ServiceValidationError(
289+
`Specified queue '${specifiedQueueName}' not found or not associated with locked worker version '${
290+
body.options?.lockToVersion ?? "<unknown>"
291+
}'.`
292+
);
293+
}
294+
// Use the validated queue name directly
295+
queueName = specifiedQueue.name;
296+
lockedQueueId = specifiedQueue.id;
297+
} else {
298+
// No specific queue name provided, use the default queue for the task on the locked worker
299+
const lockedTask = await this._prisma.backgroundWorkerTask.findFirst({
300+
where: {
301+
workerId: lockedToBackgroundWorker.id,
302+
slug: taskId,
303+
},
304+
include: {
305+
queue: true,
306+
},
307+
});
308+
309+
if (!lockedTask) {
310+
throw new ServiceValidationError(
311+
`Task '${taskId}' not found on locked worker version '${
312+
body.options?.lockToVersion ?? "<unknown>"
313+
}'.`
314+
);
315+
}
316+
317+
if (!lockedTask.queue) {
318+
// This case should ideally be prevented by earlier checks or schema constraints,
319+
// but handle it defensively.
320+
logger.error("Task found on locked worker, but has no associated queue record", {
321+
taskId,
322+
workerId: lockedToBackgroundWorker.id,
323+
version: lockedToBackgroundWorker.version,
324+
});
325+
throw new ServiceValidationError(
326+
`Default queue configuration for task '${taskId}' missing on locked worker version '${
327+
body.options?.lockToVersion ?? "<unknown>"
328+
}'.`
329+
);
330+
}
331+
// Use the task's default queue name
332+
queueName = lockedTask.queue.name;
333+
lockedQueueId = lockedTask.queue.id;
334+
}
335+
} else {
336+
// Task is not locked to a specific version, use regular logic
337+
if (body.options?.lockToVersion) {
338+
// This should only happen if the findFirst failed, indicating the version doesn't exist
339+
throw new ServiceValidationError(
340+
`Task locked to version '${body.options.lockToVersion}', but no worker found with that version.`
341+
);
342+
}
343+
344+
// Get queue name using the helper for non-locked case (handles provided name or finds default)
345+
queueName = await this.#getQueueName(taskId, environment, body.options?.queue?.name);
346+
}
347+
348+
// Sanitize the final determined queue name once
349+
const sanitizedQueueName = sanitizeQueueName(queueName);
350+
351+
// Check that the queuename is not an empty string
352+
if (!sanitizedQueueName) {
353+
queueName = sanitizeQueueName(`task/${taskId}`); // Fallback if sanitization results in empty
354+
} else {
355+
queueName = sanitizedQueueName;
356+
}
357+
358+
//upsert tags
359+
const tags = await createTags(
360+
{
361+
tags: body.options?.tags,
362+
projectId: environment.projectId,
363+
},
364+
this._prisma
365+
);
366+
367+
const depth = parentRun ? parentRun.depth + 1 : 0;
368+
369+
const masterQueue = await this.#getMasterQueueForEnvironment(environment);
370+
254371
try {
255372
return await eventRepository.traceEvent(
256373
taskId,
@@ -279,50 +396,11 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
279396
const result = await autoIncrementCounter.incrementInTransaction(
280397
`v3-run:${environment.id}:${taskId}`,
281398
async (num, tx) => {
282-
const lockedToBackgroundWorker = body.options?.lockToVersion
283-
? await tx.backgroundWorker.findFirst({
284-
where: {
285-
projectId: environment.projectId,
286-
runtimeEnvironmentId: environment.id,
287-
version: body.options?.lockToVersion,
288-
},
289-
select: {
290-
id: true,
291-
version: true,
292-
sdkVersion: true,
293-
cliVersion: true,
294-
},
295-
})
296-
: undefined;
297-
298-
let queueName = sanitizeQueueName(
299-
await this.#getQueueName(taskId, environment, body.options?.queue?.name)
300-
);
301-
302-
// Check that the queuename is not an empty string
303-
if (!queueName) {
304-
queueName = sanitizeQueueName(`task/${taskId}`);
305-
}
306-
307399
event.setAttribute("queueName", queueName);
308400
span.setAttribute("queueName", queueName);
309-
310-
//upsert tags
311-
const tags = await createTags(
312-
{
313-
tags: body.options?.tags,
314-
projectId: environment.projectId,
315-
},
316-
this._prisma
317-
);
318-
319-
const depth = parentRun ? parentRun.depth + 1 : 0;
320-
321401
event.setAttribute("runId", runFriendlyId);
322402
span.setAttribute("runId", runFriendlyId);
323403

324-
const masterQueue = await this.#getMasterQueueForEnvironment(environment);
325-
326404
const taskRun = await this._engine.trigger(
327405
{
328406
number: num,
@@ -345,6 +423,7 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
345423
cliVersion: lockedToBackgroundWorker?.cliVersion,
346424
concurrencyKey: body.options?.concurrencyKey,
347425
queue: queueName,
426+
lockedQueueId,
348427
masterQueue: masterQueue,
349428
isTest: body.options?.test ?? false,
350429
delayUntil,
@@ -473,13 +552,15 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
473552
return workerGroup.masterQueue;
474553
}
475554

555+
// Gets the queue name when the task is NOT locked to a specific version
476556
async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
477557
if (queueName) {
478558
return queueName;
479559
}
480560

481561
const defaultQueueName = `task/${taskId}`;
482562

563+
// Find the current worker for the environment
483564
const worker = await findCurrentWorkerFromEnvironment(environment);
484565

485566
if (!worker) {

internal-packages/run-engine/src/engine/db/worker.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ export async function getRunWithBackgroundWorkerTasks(
161161
}
162162
}
163163

164-
const queue = workerWithTasks.queues.find((queue) => queue.name === run.queue);
164+
const queue = workerWithTasks.queues.find((queue) =>
165+
run.lockedQueueId ? queue.id === run.lockedQueueId : queue.name === run.queue
166+
);
165167

166168
if (!queue) {
167169
return {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ export class RunEngine {
341341
concurrencyKey,
342342
masterQueue,
343343
queue,
344+
lockedQueueId,
344345
isTest,
345346
delayUntil,
346347
queuedAt,
@@ -420,6 +421,7 @@ export class RunEngine {
420421
cliVersion,
421422
concurrencyKey,
422423
queue,
424+
lockedQueueId,
423425
masterQueue,
424426
secondaryMasterQueue,
425427
isTest,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export type TriggerParams = {
8484
concurrencyKey?: string;
8585
masterQueue?: string;
8686
queue: string;
87+
lockedQueueId?: string;
8788
isTest: boolean;
8889
delayUntil?: Date;
8990
queuedAt?: Date;

references/hello-world/src/trigger/queues.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { batch, logger, queue, queues, task } from "@trigger.dev/sdk/v3";
1+
import { batch, logger, queue, queues, task, tasks } from "@trigger.dev/sdk/v3";
22

33
export const queuesTester = task({
44
id: "queues-tester",
@@ -235,3 +235,30 @@ export const sharedQueueTestTask = task({
235235
};
236236
},
237237
});
238+
239+
export const lockedQueueTask = task({
240+
id: "locked-queue-task",
241+
run: async (payload: any) => {
242+
logger.log("Locked queue task", { payload });
243+
244+
await lockedQueueChildTask.trigger({}, { queue: "queue_does_not_exist" });
245+
await lockedQueueChildTask.triggerAndWait({}, { queue: "queue_does_not_exist" });
246+
},
247+
});
248+
249+
export const lockedQueueChildTask = task({
250+
id: "locked-queue-child-task",
251+
run: async (payload: any) => {
252+
logger.log("Locked queue child task", { payload });
253+
},
254+
});
255+
256+
export const lockedTaskIdentifierTask = task({
257+
id: "locked-task-identifier-task",
258+
run: async (payload: any) => {
259+
logger.log("Locked task identifier task", { payload });
260+
261+
await tasks.trigger("task_does_not_exist", {});
262+
await tasks.triggerAndWait("task_does_not_exist", {});
263+
},
264+
});

0 commit comments

Comments
 (0)