Skip to content

Commit 78d431a

Browse files
committed
Extracted out the trigger queues logic
1 parent 7837402 commit 78d431a

File tree

4 files changed

+388
-206
lines changed

4 files changed

+388
-206
lines changed
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import { sanitizeQueueName } from "@trigger.dev/core/v3/isomorphic";
2+
import { PrismaClientOrTransaction } from "@trigger.dev/database";
3+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
4+
import { logger } from "~/services/logger.server";
5+
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";
6+
import { ServiceValidationError } from "~/v3/services/baseService.server";
7+
import {
8+
LockedBackgroundWorker,
9+
QueueManager,
10+
QueueProperties,
11+
QueueValidationResult,
12+
TriggerTaskRequest,
13+
} from "../types";
14+
import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.server";
15+
import type { RunEngine } from "~/v3/runEngine.server";
16+
import { env } from "~/env.server";
17+
18+
export class DefaultQueueManager implements QueueManager {
19+
constructor(
20+
private readonly prisma: PrismaClientOrTransaction,
21+
private readonly engine: RunEngine
22+
) {}
23+
24+
async resolveQueueProperties(
25+
request: TriggerTaskRequest,
26+
lockedBackgroundWorker?: LockedBackgroundWorker
27+
): Promise<QueueProperties> {
28+
let queueName: string;
29+
let lockedQueueId: string | undefined;
30+
31+
// Determine queue name based on lockToVersion and provided options
32+
if (lockedBackgroundWorker) {
33+
// Task is locked to a specific worker version
34+
if (request.body.options?.queue?.name) {
35+
const specifiedQueueName = request.body.options.queue.name;
36+
// A specific queue name is provided
37+
const specifiedQueue = await this.prisma.taskQueue.findFirst({
38+
// Validate it exists for the locked worker
39+
where: {
40+
name: specifiedQueueName,
41+
workers: { some: { id: lockedBackgroundWorker.id } }, // Ensure the queue is associated with any task of the locked worker
42+
},
43+
});
44+
45+
if (!specifiedQueue) {
46+
throw new ServiceValidationError(
47+
`Specified queue '${specifiedQueueName}' not found or not associated with locked version '${
48+
lockedBackgroundWorker.version ?? "<unknown>"
49+
}'.`
50+
);
51+
}
52+
// Use the validated queue name directly
53+
queueName = specifiedQueue.name;
54+
lockedQueueId = specifiedQueue.id;
55+
} else {
56+
// No specific queue name provided, use the default queue for the task on the locked worker
57+
const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
58+
where: {
59+
workerId: lockedBackgroundWorker.id,
60+
slug: request.taskId,
61+
},
62+
include: {
63+
queue: true,
64+
},
65+
});
66+
67+
if (!lockedTask) {
68+
throw new ServiceValidationError(
69+
`Task '${request.taskId}' not found on locked version '${
70+
lockedBackgroundWorker.version ?? "<unknown>"
71+
}'.`
72+
);
73+
}
74+
75+
if (!lockedTask.queue) {
76+
// This case should ideally be prevented by earlier checks or schema constraints,
77+
// but handle it defensively.
78+
logger.error("Task found on locked version, but has no associated queue record", {
79+
taskId: request.taskId,
80+
workerId: lockedBackgroundWorker.id,
81+
version: lockedBackgroundWorker.version,
82+
});
83+
throw new ServiceValidationError(
84+
`Default queue configuration for task '${request.taskId}' missing on locked version '${
85+
lockedBackgroundWorker.version ?? "<unknown>"
86+
}'.`
87+
);
88+
}
89+
// Use the task's default queue name
90+
queueName = lockedTask.queue.name;
91+
lockedQueueId = lockedTask.queue.id;
92+
}
93+
} else {
94+
// Task is not locked to a specific version, use regular logic
95+
if (request.body.options?.lockToVersion) {
96+
// This should only happen if the findFirst failed, indicating the version doesn't exist
97+
throw new ServiceValidationError(
98+
`Task locked to version '${request.body.options.lockToVersion}', but no worker found with that version.`
99+
);
100+
}
101+
102+
// Get queue name using the helper for non-locked case (handles provided name or finds default)
103+
queueName = await this.getQueueName(request);
104+
}
105+
106+
// Sanitize the final determined queue name once
107+
const sanitizedQueueName = sanitizeQueueName(queueName);
108+
109+
// Check that the queuename is not an empty string
110+
if (!sanitizedQueueName) {
111+
queueName = sanitizeQueueName(`task/${request.taskId}`); // Fallback if sanitization results in empty
112+
} else {
113+
queueName = sanitizedQueueName;
114+
}
115+
116+
return {
117+
queueName,
118+
lockedQueueId,
119+
};
120+
}
121+
122+
async getQueueName(request: TriggerTaskRequest): Promise<string> {
123+
const { taskId, environment, body } = request;
124+
const { queue } = body.options ?? {};
125+
126+
if (queue?.name) {
127+
return queue.name;
128+
}
129+
130+
const defaultQueueName = `task/${taskId}`;
131+
132+
// Find the current worker for the environment
133+
const worker = await findCurrentWorkerFromEnvironment(environment);
134+
135+
if (!worker) {
136+
logger.debug("Failed to get queue name: No worker found", {
137+
taskId,
138+
environmentId: environment.id,
139+
});
140+
141+
return defaultQueueName;
142+
}
143+
144+
const task = await this.prisma.backgroundWorkerTask.findFirst({
145+
where: {
146+
workerId: worker.id,
147+
slug: taskId,
148+
},
149+
include: {
150+
queue: true,
151+
},
152+
});
153+
154+
if (!task) {
155+
console.log("Failed to get queue name: No task found", {
156+
taskId,
157+
environmentId: environment.id,
158+
});
159+
160+
return defaultQueueName;
161+
}
162+
163+
if (!task.queue) {
164+
console.log("Failed to get queue name: No queue found", {
165+
taskId,
166+
environmentId: environment.id,
167+
queueConfig: task.queueConfig,
168+
});
169+
170+
return defaultQueueName;
171+
}
172+
173+
return task.queue.name ?? defaultQueueName;
174+
}
175+
176+
async validateQueueLimits(environment: AuthenticatedEnvironment): Promise<QueueValidationResult> {
177+
const queueSizeGuard = await guardQueueSizeLimitsForEnv(this.engine, environment);
178+
179+
logger.debug("Queue size guard result", {
180+
queueSizeGuard,
181+
environment: {
182+
id: environment.id,
183+
type: environment.type,
184+
organization: environment.organization,
185+
project: environment.project,
186+
},
187+
});
188+
189+
return {
190+
ok: queueSizeGuard.isWithinLimits,
191+
maximumSize: queueSizeGuard.maximumSize ?? 0,
192+
queueSize: queueSizeGuard.queueSize ?? 0,
193+
};
194+
}
195+
196+
async getMasterQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
197+
if (environment.type === "DEVELOPMENT") {
198+
return;
199+
}
200+
201+
const workerGroupService = new WorkerGroupService({
202+
prisma: this.prisma,
203+
engine: this.engine,
204+
});
205+
206+
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
207+
projectId: environment.projectId,
208+
});
209+
210+
if (!workerGroup) {
211+
throw new ServiceValidationError("No worker group found");
212+
}
213+
214+
return workerGroup.masterQueue;
215+
}
216+
}
217+
218+
function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined {
219+
if (environment.type === "DEVELOPMENT") {
220+
return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE;
221+
} else {
222+
return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE;
223+
}
224+
}
225+
226+
async function guardQueueSizeLimitsForEnv(
227+
engine: RunEngine,
228+
environment: AuthenticatedEnvironment,
229+
itemsToAdd: number = 1
230+
) {
231+
const maximumSize = getMaximumSizeForEnvironment(environment);
232+
233+
if (typeof maximumSize === "undefined") {
234+
return { isWithinLimits: true };
235+
}
236+
237+
const queueSize = await engine.lengthOfEnvQueue(environment);
238+
const projectedSize = queueSize + itemsToAdd;
239+
240+
return {
241+
isWithinLimits: projectedSize <= maximumSize,
242+
maximumSize,
243+
queueSize,
244+
};
245+
}

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import { logger } from "~/services/logger.server";
1616
import { getEntitlement } from "~/services/platform.v3.server";
1717
import { workerQueue } from "~/services/worker.server";
1818
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
19-
import { startActiveSpan } from "../../v3/tracer.server";
2019
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
2120
import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server";
21+
import { startActiveSpan } from "../../v3/tracer.server";
2222

2323
const PROCESSING_BATCH_SIZE = 50;
2424
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;

0 commit comments

Comments
 (0)