Skip to content

Commit 8cae1d0

Browse files
authored
v3: fix triggering tasks with custom queues (#1242)
* add missing dotenv requires in catalog scripts * pass task queue options to all task trigger functions * remove unnecessary include * fallback to background worker task queue options * add changeset
1 parent 994ea7c commit 8cae1d0

File tree

6 files changed

+111
-19
lines changed

6 files changed

+111
-19
lines changed

.changeset/few-poems-vanish.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fix trigger functions for custom queues

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -274,14 +274,6 @@ export class SharedQueueConsumer {
274274
where: {
275275
id: message.messageId,
276276
},
277-
include: {
278-
lockedToVersion: {
279-
include: {
280-
deployment: true,
281-
tasks: true,
282-
},
283-
},
284-
},
285277
});
286278

287279
if (!existingTaskRun) {

apps/webapp/app/v3/models/workerDeployment.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import type { Prettify } from "@trigger.dev/core";
2+
import { BackgroundWorker } from "@trigger.dev/database";
23
import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
34
import { Prisma, prisma } from "~/db.server";
5+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
46

57
export type CurrentWorkerDeployment = Prettify<
68
NonNullable<Awaited<ReturnType<typeof findCurrentWorkerDeployment>>>
@@ -42,6 +44,25 @@ export async function findCurrentWorkerDeployment(
4244
return promotion?.deployment;
4345
}
4446

47+
export async function findCurrentWorkerFromEnvironment(
48+
environment: Pick<AuthenticatedEnvironment, "id" | "type">
49+
): Promise<BackgroundWorker | null> {
50+
if (environment.type === "DEVELOPMENT") {
51+
const latestDevWorker = await prisma.backgroundWorker.findFirst({
52+
where: {
53+
runtimeEnvironmentId: environment.id,
54+
},
55+
orderBy: {
56+
createdAt: "desc",
57+
},
58+
});
59+
return latestDevWorker;
60+
} else {
61+
const deployment = await findCurrentWorkerDeployment(environment.id);
62+
return deployment?.worker ?? null;
63+
}
64+
}
65+
4566
export async function getWorkerDeploymentFromWorker(
4667
workerId: string
4768
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
IOPacket,
3+
QueueOptions,
34
SemanticInternalAttributes,
45
TriggerTaskRequestBody,
56
packetRequiresOffloading,
@@ -18,6 +19,7 @@ import { BaseService, ServiceValidationError } from "./baseService.server";
1819
import { logger } from "~/services/logger.server";
1920
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2021
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
22+
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2123

2224
export type TriggerTaskServiceOptions = {
2325
idempotencyKey?: string;
@@ -211,7 +213,9 @@ export class TriggerTaskService extends BaseService {
211213
})
212214
: undefined;
213215

214-
let queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`);
216+
let queueName = sanitizeQueueName(
217+
await this.#getQueueName(taskId, environment, body.options?.queue?.name)
218+
);
215219

216220
// Check that the queuename is not an empty string
217221
if (!queueName) {
@@ -399,6 +403,57 @@ export class TriggerTaskService extends BaseService {
399403
});
400404
}
401405

406+
async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
407+
if (queueName) {
408+
return queueName;
409+
}
410+
411+
const defaultQueueName = `task/${taskId}`;
412+
413+
const worker = await findCurrentWorkerFromEnvironment(environment);
414+
415+
if (!worker) {
416+
logger.debug("Failed to get queue name: No worker found", {
417+
taskId,
418+
environmentId: environment.id,
419+
});
420+
421+
return defaultQueueName;
422+
}
423+
424+
const task = await this._prisma.backgroundWorkerTask.findUnique({
425+
where: {
426+
workerId_slug: {
427+
workerId: worker.id,
428+
slug: taskId,
429+
},
430+
},
431+
});
432+
433+
if (!task) {
434+
console.log("Failed to get queue name: No task found", {
435+
taskId,
436+
environmentId: environment.id,
437+
});
438+
439+
return defaultQueueName;
440+
}
441+
442+
const queueConfig = QueueOptions.optional().safeParse(task.queueConfig);
443+
444+
if (!queueConfig.success) {
445+
console.log("Failed to get queue name: Invalid queue config", {
446+
taskId,
447+
environmentId: environment.id,
448+
queueConfig: task.queueConfig,
449+
});
450+
451+
return defaultQueueName;
452+
}
453+
454+
return queueConfig.data?.name ?? defaultQueueName;
455+
}
456+
402457
async #handlePayloadPacket(
403458
payload: any,
404459
payloadType: string,

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,13 @@ export function createTask<
476476
>(
477477
params: TaskOptions<TIdentifier, TInput, TOutput, TInitOutput>
478478
): Task<TIdentifier, TInput, TOutput> {
479+
const customQueue = params.queue
480+
? queue({
481+
name: params.queue?.name ?? `task/${params.id}`,
482+
...params.queue,
483+
})
484+
: undefined;
485+
479486
const task: Task<TIdentifier, TInput, TOutput> = {
480487
id: params.id,
481488
trigger: async (payload, options) => {
@@ -487,7 +494,10 @@ export function createTask<
487494
: `trigger()`,
488495
params.id,
489496
payload,
490-
options
497+
{
498+
queue: customQueue,
499+
...options,
500+
}
491501
);
492502
},
493503
batchTrigger: async (items) => {
@@ -498,7 +508,9 @@ export function createTask<
498508
? `${taskMetadata.exportName}.batchTrigger()`
499509
: `batchTrigger()`,
500510
params.id,
501-
items
511+
items,
512+
undefined,
513+
customQueue
502514
);
503515
},
504516
triggerAndWait: async (payload, options) => {
@@ -510,7 +522,10 @@ export function createTask<
510522
: `triggerAndWait()`,
511523
params.id,
512524
payload,
513-
options
525+
{
526+
queue: customQueue,
527+
...options,
528+
}
514529
);
515530
},
516531
batchTriggerAndWait: async (items) => {
@@ -521,7 +536,9 @@ export function createTask<
521536
? `${taskMetadata.exportName}.batchTriggerAndWait()`
522537
: `batchTriggerAndWait()`,
523538
params.id,
524-
items
539+
items,
540+
undefined,
541+
customQueue
525542
);
526543
},
527544
};
@@ -758,7 +775,8 @@ async function batchTrigger_internal<TPayload, TOutput>(
758775
name: string,
759776
id: string,
760777
items: Array<BatchItem<TPayload>>,
761-
requestOptions?: ApiRequestOptions
778+
requestOptions?: ApiRequestOptions,
779+
queue?: QueueOptions
762780
): Promise<BatchRunHandle<TOutput>> {
763781
const apiClient = apiClientManager.client;
764782

@@ -776,7 +794,7 @@ async function batchTrigger_internal<TPayload, TOutput>(
776794
return {
777795
payload: payloadPacket.data,
778796
options: {
779-
queue: item.options?.queue,
797+
queue: item.options?.queue ?? queue,
780798
concurrencyKey: item.options?.concurrencyKey,
781799
test: taskContext.ctx?.run.isTest,
782800
payloadType: payloadPacket.dataType,
@@ -919,7 +937,8 @@ async function batchTriggerAndWait_internal<TPayload, TOutput>(
919937
name: string,
920938
id: string,
921939
items: Array<BatchItem<TPayload>>,
922-
requestOptions?: ApiRequestOptions
940+
requestOptions?: ApiRequestOptions,
941+
queue?: QueueOptions
923942
): Promise<BatchResult<TOutput>> {
924943
const ctx = taskContext.ctx;
925944

@@ -947,7 +966,7 @@ async function batchTriggerAndWait_internal<TPayload, TOutput>(
947966
payload: payloadPacket.data,
948967
options: {
949968
lockToVersion: taskContext.worker?.version,
950-
queue: item.options?.queue,
969+
queue: item.options?.queue ?? queue,
951970
concurrencyKey: item.options?.concurrencyKey,
952971
test: taskContext.ctx?.run.isTest,
953972
payloadType: payloadPacket.dataType,

references/v3-catalog/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
},
88
"scripts": {
99
"dev:trigger": "triggerdev dev",
10-
"management": "ts-node -r tsconfig-paths/register ./src/management.ts",
11-
"queues": "ts-node -r tsconfig-paths/register ./src/queues.ts",
10+
"management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts",
11+
"queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts",
1212
"build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs",
1313
"client": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/clientUsage.ts",
1414
"triggerWithLargePayload": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/triggerWithLargePayload.ts",

0 commit comments

Comments
 (0)