Skip to content

Commit 51bb4c8

Browse files
authored
Support custom queue when triggering a task (#1138)
* Created a v3-catalog test script for queues * SDK: Fix for calling trigger and passing a custom queue * Support custom queue in TriggerTaskService * Improved the script in the catalog so it’s clearer what’s going on * Remove the concurrencyLimit from a queue if the limit is null * Fix for the test code… stupid
1 parent ba71f95 commit 51bb4c8

File tree

7 files changed

+134
-7
lines changed

7 files changed

+134
-7
lines changed

.changeset/itchy-chairs-itch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Fix for calling trigger and passing a custom queue

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ export class MarQS {
8282
return this.redis.set(this.keys.queueConcurrencyLimitKey(env, queue), concurrency);
8383
}
8484

85+
public async removeQueueConcurrencyLimits(env: AuthenticatedEnvironment, queue: string) {
86+
return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue));
87+
}
88+
8589
public async updateEnvConcurrencyLimits(env: AuthenticatedEnvironment) {
8690
await this.#callUpdateGlobalConcurrencyLimits({
8791
envConcurrencyLimitKey: this.keys.envConcurrencyLimitKey(env),

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ export async function createBackgroundTasks(
179179
taskQueue.name,
180180
taskQueue.concurrencyLimit
181181
);
182+
} else {
183+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
182184
}
183185
} catch (error) {
184186
if (error instanceof Prisma.PrismaClientKnownRequestError) {

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ import {
55
packetRequiresOffloading,
66
} from "@trigger.dev/core/v3";
77
import { createHash } from "node:crypto";
8-
import { $transaction } from "~/db.server";
8+
import { $transaction, prisma } from "~/db.server";
99
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10+
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
1011
import { eventRepository } from "../eventRepository.server";
1112
import { generateFriendlyId } from "../friendlyIdentifiers";
12-
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
1313
import { uploadToObjectStore } from "../r2.server";
1414
import { BaseService } from "./baseService.server";
1515

@@ -111,7 +111,12 @@ export class TriggerTaskService extends BaseService {
111111
select: { lastNumber: true },
112112
});
113113

114-
const queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`);
114+
let queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`);
115+
116+
// Check that the queuename is not an empty string
117+
if (!queueName) {
118+
queueName = sanitizeQueueName(`task/${taskId}`);
119+
}
115120

116121
event.setAttribute("queueName", queueName);
117122
span.setAttribute("queueName", queueName);
@@ -182,6 +187,43 @@ export class TriggerTaskService extends BaseService {
182187
}
183188
}
184189

190+
if (body.options?.queue) {
191+
const concurrencyLimit = body.options.queue.concurrencyLimit
192+
? Math.max(0, body.options.queue.concurrencyLimit)
193+
: null;
194+
const taskQueue = await prisma.taskQueue.upsert({
195+
where: {
196+
runtimeEnvironmentId_name: {
197+
runtimeEnvironmentId: environment.id,
198+
name: queueName,
199+
},
200+
},
201+
update: {
202+
concurrencyLimit,
203+
rateLimit: body.options.queue.rateLimit,
204+
},
205+
create: {
206+
friendlyId: generateFriendlyId("queue"),
207+
name: queueName,
208+
concurrencyLimit,
209+
runtimeEnvironmentId: environment.id,
210+
projectId: environment.projectId,
211+
rateLimit: body.options.queue.rateLimit,
212+
type: "NAMED",
213+
},
214+
});
215+
216+
if (typeof taskQueue.concurrencyLimit === "number") {
217+
await marqs?.updateQueueConcurrencyLimits(
218+
environment,
219+
taskQueue.name,
220+
taskQueue.concurrencyLimit
221+
);
222+
} else {
223+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
224+
}
225+
}
226+
185227
return taskRun;
186228
});
187229

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ export function createTask<TInput = void, TOutput = unknown, TInitOutput extends
353353
{
354354
payload: payloadPacket.data,
355355
options: {
356-
queue: params.queue,
356+
queue: options?.queue ?? params.queue,
357357
concurrencyKey: options?.concurrencyKey,
358358
test: taskContext.ctx?.run.isTest,
359359
payloadType: payloadPacket.dataType,
@@ -482,7 +482,7 @@ export function createTask<TInput = void, TOutput = unknown, TInitOutput extends
482482
options: {
483483
dependentAttempt: ctx.attempt.id,
484484
lockToVersion: taskContext.worker?.version, // Lock to current version because we're waiting for it to finish
485-
queue: params.queue,
485+
queue: options?.queue ?? params.queue,
486486
concurrencyKey: options?.concurrencyKey,
487487
test: taskContext.ctx?.run.isTest,
488488
payloadType: payloadPacket.dataType,

references/v3-catalog/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"private": true,
55
"scripts": {
66
"dev:trigger": "triggerdev dev",
7-
"management": "ts-node -r tsconfig-paths/register ./src/management.ts"
7+
"management": "ts-node -r tsconfig-paths/register ./src/management.ts",
8+
"queues": "ts-node -r tsconfig-paths/register ./src/queues.ts"
89
},
910
"dependencies": {
1011
"@ffmpeg-installer/ffmpeg": "^1.1.0",
@@ -54,4 +55,4 @@
5455
"tsconfig-paths": "^4.2.0",
5556
"typescript": "^5.3.0"
5657
}
57-
}
58+
}

references/v3-catalog/src/queues.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import dotenv from "dotenv";
2+
import { simpleChildTask } from "./trigger/subtasks";
3+
import { wait } from "@trigger.dev/sdk/v3";
4+
import { setTimeout } from "timers/promises";
5+
6+
dotenv.config();
7+
8+
export async function run() {
9+
await simpleChildTask.trigger({ message: "Regular queue" });
10+
await simpleChildTask.trigger(
11+
{ message: "Simple alt queue 1" },
12+
{
13+
queue: {
14+
name: "queue-concurrency-3",
15+
concurrencyLimit: 1,
16+
},
17+
}
18+
);
19+
await simpleChildTask.trigger(
20+
{ message: "Simple alt queue 2" },
21+
{
22+
queue: {
23+
name: "queue-concurrency-3",
24+
concurrencyLimit: 1,
25+
},
26+
}
27+
);
28+
29+
await simpleChildTask.batchTrigger([{ payload: { message: "Regular queue" } }]);
30+
await simpleChildTask.batchTrigger([
31+
{
32+
payload: { message: "Batched alt queue 1" },
33+
options: {
34+
queue: {
35+
name: "queue-concurrency-3",
36+
concurrencyLimit: 1,
37+
},
38+
},
39+
},
40+
{
41+
payload: { message: "Batched alt queue 2" },
42+
options: {
43+
queue: {
44+
name: "queue-concurrency-3",
45+
concurrencyLimit: 1,
46+
},
47+
},
48+
},
49+
{
50+
payload: { message: "Batched alt queue 3" },
51+
options: {
52+
queue: {
53+
name: "queue-concurrency-3",
54+
concurrencyLimit: 1,
55+
},
56+
},
57+
},
58+
]);
59+
60+
await setTimeout(10_000);
61+
62+
//this should set the concurrencyLimit back to none
63+
await simpleChildTask.trigger(
64+
{ message: "Simple alt queue 2" },
65+
{
66+
queue: {
67+
name: "queue-concurrency-3",
68+
},
69+
}
70+
);
71+
}
72+
73+
run().catch(console.error);

0 commit comments

Comments
 (0)