Skip to content

Commit 7a58439

Browse files
committed
WIP queue indexing
1 parent 49a3f72 commit 7a58439

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+813
-701
lines changed

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AuthenticatedEnvironment } from "@internal/testcontainers";
1+
import type { AuthenticatedEnvironment } from "@internal/run-engine";
22
import type { Prisma, PrismaClientOrTransaction, RuntimeEnvironment } from "@trigger.dev/database";
33
import { prisma } from "~/db.server";
44
import { getUsername } from "~/utils/username";

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 203 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
import {
2+
BackgroundWorkerMetadata,
23
BackgroundWorkerSourceFileMetadata,
34
CreateBackgroundWorkerRequestBody,
5+
QueueManifest,
46
TaskResource,
57
} from "@trigger.dev/core/v3";
6-
import type { BackgroundWorker } from "@trigger.dev/database";
8+
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
9+
import type { BackgroundWorker, TaskQueue, TaskQueueType } from "@trigger.dev/database";
10+
import cronstrue from "cronstrue";
711
import { Prisma, PrismaClientOrTransaction } from "~/db.server";
12+
import { sanitizeQueueName } from "~/models/taskQueue.server";
813
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
914
import { logger } from "~/services/logger.server";
10-
import { marqs } from "~/v3/marqs/index.server";
1115
import { generateFriendlyId } from "../friendlyIdentifiers";
12-
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
13-
import { BaseService } from "./baseService.server";
14-
import { projectPubSub } from "./projectPubSub.server";
15-
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
16-
import cronstrue from "cronstrue";
17-
import { CheckScheduleService } from "./checkSchedule.server";
18-
import { clampMaxDuration } from "../utils/maxDuration";
1916
import {
2017
removeQueueConcurrencyLimits,
2118
updateEnvConcurrencyLimits,
2219
updateQueueConcurrencyLimits,
2320
} from "../runQueue.server";
24-
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
25-
import { sanitizeQueueName } from "~/models/taskQueue.server";
21+
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
22+
import { clampMaxDuration } from "../utils/maxDuration";
23+
import { BaseService } from "./baseService.server";
24+
import { CheckScheduleService } from "./checkSchedule.server";
25+
import { projectPubSub } from "./projectPubSub.server";
26+
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
2627

2728
export class CreateBackgroundWorkerService extends BaseService {
2829
public async call(
@@ -101,8 +102,8 @@ export class CreateBackgroundWorkerService extends BaseService {
101102
environment,
102103
this._prisma
103104
);
104-
await createBackgroundTasks(
105-
body.metadata.tasks,
105+
await createWorkerResources(
106+
body.metadata,
106107
backgroundWorker,
107108
environment,
108109
this._prisma,
@@ -154,135 +155,218 @@ export class CreateBackgroundWorkerService extends BaseService {
154155
}
155156
}
156157

157-
export async function createBackgroundTasks(
158-
tasks: TaskResource[],
158+
export async function createWorkerResources(
159+
metadata: BackgroundWorkerMetadata,
159160
worker: BackgroundWorker,
160161
environment: AuthenticatedEnvironment,
161162
prisma: PrismaClientOrTransaction,
162163
tasksToBackgroundFiles?: Map<string, string>
163164
) {
164-
for (const task of tasks) {
165-
try {
166-
await prisma.backgroundWorkerTask.create({
167-
data: {
168-
friendlyId: generateFriendlyId("task"),
169-
projectId: worker.projectId,
170-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
171-
workerId: worker.id,
172-
slug: task.id,
173-
description: task.description,
174-
filePath: task.filePath,
175-
exportName: task.exportName,
176-
retryConfig: task.retry,
177-
queueConfig: task.queue,
178-
machineConfig: task.machine,
179-
triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD",
180-
fileId: tasksToBackgroundFiles?.get(task.id) ?? null,
181-
maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null,
182-
},
183-
});
165+
// Create the queues
166+
const queues = await createWorkerQueues(metadata, worker, environment, prisma);
184167

185-
let queueName = sanitizeQueueName(task.queue?.name ?? `task/${task.id}`);
168+
// Create the tasks
169+
await createWorkerTasks(metadata, queues, worker, environment, prisma, tasksToBackgroundFiles);
170+
}
186171

187-
// Check that the queuename is not an empty string
188-
if (!queueName) {
189-
queueName = sanitizeQueueName(`task/${task.id}`);
190-
}
172+
async function createWorkerTasks(
173+
metadata: BackgroundWorkerMetadata,
174+
queues: Array<TaskQueue>,
175+
worker: BackgroundWorker,
176+
environment: AuthenticatedEnvironment,
177+
prisma: PrismaClientOrTransaction,
178+
tasksToBackgroundFiles?: Map<string, string>
179+
) {
180+
// Create tasks in chunks of 20
181+
const CHUNK_SIZE = 20;
182+
for (let i = 0; i < metadata.tasks.length; i += CHUNK_SIZE) {
183+
const chunk = metadata.tasks.slice(i, i + CHUNK_SIZE);
184+
await Promise.all(
185+
chunk.map((task) =>
186+
createWorkerTask(task, queues, worker, environment, prisma, tasksToBackgroundFiles)
187+
)
188+
);
189+
}
190+
}
191191

192-
const concurrencyLimit =
193-
typeof task.queue?.concurrencyLimit === "number"
194-
? Math.max(
195-
Math.min(
196-
task.queue.concurrencyLimit,
197-
environment.maximumConcurrencyLimit,
198-
environment.organization.maximumConcurrencyLimit
199-
),
200-
0
201-
)
202-
: task.queue?.concurrencyLimit;
203-
204-
let taskQueue = await prisma.taskQueue.findFirst({
205-
where: {
206-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
207-
name: queueName,
192+
async function createWorkerTask(
193+
task: TaskResource,
194+
queues: Array<TaskQueue>,
195+
worker: BackgroundWorker,
196+
environment: AuthenticatedEnvironment,
197+
prisma: PrismaClientOrTransaction,
198+
tasksToBackgroundFiles?: Map<string, string>
199+
) {
200+
try {
201+
let queue = queues.find((queue) => queue.name === task.queue?.name);
202+
203+
if (!queue) {
204+
// Create a TaskQueue
205+
queue = await createWorkerQueue(
206+
{
207+
name: `task/${task.id}`,
208208
},
209-
});
210-
211-
if (!taskQueue) {
212-
taskQueue = await prisma.taskQueue.create({
213-
data: {
214-
friendlyId: generateFriendlyId("queue"),
215-
name: queueName,
216-
concurrencyLimit,
217-
runtimeEnvironmentId: worker.runtimeEnvironmentId,
218-
projectId: worker.projectId,
219-
type: task.queue?.name ? "NAMED" : "VIRTUAL",
220-
},
221-
});
222-
}
209+
task.id,
210+
"VIRTUAL",
211+
worker,
212+
environment,
213+
prisma
214+
);
215+
}
223216

224-
if (typeof concurrencyLimit === "number") {
225-
logger.debug("CreateBackgroundWorkerService: updating concurrency limit", {
226-
workerId: worker.id,
227-
taskQueue,
228-
orgId: environment.organizationId,
229-
projectId: environment.projectId,
230-
environmentId: environment.id,
231-
concurrencyLimit,
232-
taskidentifier: task.id,
217+
await prisma.backgroundWorkerTask.create({
218+
data: {
219+
friendlyId: generateFriendlyId("task"),
220+
projectId: worker.projectId,
221+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
222+
workerId: worker.id,
223+
slug: task.id,
224+
description: task.description,
225+
filePath: task.filePath,
226+
exportName: task.exportName,
227+
retryConfig: task.retry,
228+
queueConfig: task.queue,
229+
machineConfig: task.machine,
230+
triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD",
231+
fileId: tasksToBackgroundFiles?.get(task.id) ?? null,
232+
maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null,
233+
queueId: queue.id,
234+
},
235+
});
236+
} catch (error) {
237+
if (error instanceof Prisma.PrismaClientKnownRequestError) {
238+
// The error code for unique constraint violation in Prisma is P2002
239+
if (error.code === "P2002") {
240+
logger.warn("Task already exists", {
241+
task,
242+
worker,
233243
});
234-
await updateQueueConcurrencyLimits(environment, taskQueue.name, concurrencyLimit);
235244
} else {
236-
logger.debug("CreateBackgroundWorkerService: removing concurrency limit", {
237-
workerId: worker.id,
238-
taskQueue,
239-
orgId: environment.organizationId,
240-
projectId: environment.projectId,
241-
environmentId: environment.id,
242-
concurrencyLimit,
243-
taskidentifier: task.id,
244-
});
245-
await removeQueueConcurrencyLimits(environment, taskQueue.name);
246-
}
247-
} catch (error) {
248-
if (error instanceof Prisma.PrismaClientKnownRequestError) {
249-
// The error code for unique constraint violation in Prisma is P2002
250-
if (error.code === "P2002") {
251-
logger.warn("Task already exists", {
252-
task,
253-
worker,
254-
});
255-
} else {
256-
logger.error("Prisma Error creating background worker task", {
257-
error: {
258-
code: error.code,
259-
message: error.message,
260-
},
261-
task,
262-
worker,
263-
});
264-
}
265-
} else if (error instanceof Error) {
266-
logger.error("Error creating background worker task", {
245+
logger.error("Prisma Error creating background worker task", {
267246
error: {
268-
name: error.name,
247+
code: error.code,
269248
message: error.message,
270-
stack: error.stack,
271249
},
272250
task,
273251
worker,
274252
});
275-
} else {
276-
logger.error("Unknown error creating background worker task", {
277-
error,
278-
task,
279-
worker,
280-
});
281253
}
254+
} else if (error instanceof Error) {
255+
logger.error("Error creating background worker task", {
256+
error: {
257+
name: error.name,
258+
message: error.message,
259+
stack: error.stack,
260+
},
261+
task,
262+
worker,
263+
});
264+
} else {
265+
logger.error("Unknown error creating background worker task", {
266+
error,
267+
task,
268+
worker,
269+
});
282270
}
283271
}
284272
}
285273

274+
async function createWorkerQueues(
275+
metadata: BackgroundWorkerMetadata,
276+
worker: BackgroundWorker,
277+
environment: AuthenticatedEnvironment,
278+
prisma: PrismaClientOrTransaction
279+
) {
280+
if (!metadata.queues) {
281+
return [];
282+
}
283+
284+
const CHUNK_SIZE = 20;
285+
const allQueues: Awaited<ReturnType<typeof createWorkerQueue>>[] = [];
286+
287+
// Process queues in chunks
288+
for (let i = 0; i < metadata.queues.length; i += CHUNK_SIZE) {
289+
const chunk = metadata.queues.slice(i, i + CHUNK_SIZE);
290+
const queueChunk = await Promise.all(
291+
chunk.map(async (queue) => {
292+
return createWorkerQueue(queue, queue.name, "NAMED", worker, environment, prisma);
293+
})
294+
);
295+
allQueues.push(...queueChunk.filter(Boolean));
296+
}
297+
298+
return allQueues;
299+
}
300+
301+
async function createWorkerQueue(
302+
queue: QueueManifest,
303+
orderableName: string,
304+
queueType: TaskQueueType,
305+
worker: BackgroundWorker,
306+
environment: AuthenticatedEnvironment,
307+
prisma: PrismaClientOrTransaction
308+
) {
309+
let queueName = sanitizeQueueName(queue.name);
310+
311+
const concurrencyLimit =
312+
typeof queue.concurrencyLimit === "number"
313+
? Math.max(
314+
Math.min(
315+
queue.concurrencyLimit,
316+
environment.maximumConcurrencyLimit,
317+
environment.organization.maximumConcurrencyLimit
318+
),
319+
0
320+
)
321+
: queue.concurrencyLimit;
322+
323+
let taskQueue = await prisma.taskQueue.findFirst({
324+
where: {
325+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
326+
name: queueName,
327+
},
328+
});
329+
330+
if (!taskQueue) {
331+
taskQueue = await prisma.taskQueue.create({
332+
data: {
333+
friendlyId: generateFriendlyId("queue"),
334+
version: "V2",
335+
name: queueName,
336+
orderableName,
337+
concurrencyLimit,
338+
runtimeEnvironmentId: worker.runtimeEnvironmentId,
339+
projectId: worker.projectId,
340+
type: queueType,
341+
},
342+
});
343+
}
344+
345+
if (typeof concurrencyLimit === "number") {
346+
logger.debug("createWorkerQueue: updating concurrency limit", {
347+
workerId: worker.id,
348+
taskQueue,
349+
orgId: environment.organizationId,
350+
projectId: environment.projectId,
351+
environmentId: environment.id,
352+
concurrencyLimit,
353+
});
354+
await updateQueueConcurrencyLimits(environment, taskQueue.name, concurrencyLimit);
355+
} else {
356+
logger.debug("createWorkerQueue: removing concurrency limit", {
357+
workerId: worker.id,
358+
taskQueue,
359+
orgId: environment.organizationId,
360+
projectId: environment.projectId,
361+
environmentId: environment.id,
362+
concurrencyLimit,
363+
});
364+
await removeQueueConcurrencyLimits(environment, taskQueue.name);
365+
}
366+
367+
return taskQueue;
368+
}
369+
286370
//CreateDeclarativeScheduleError with a message
287371
export class CreateDeclarativeScheduleError extends Error {
288372
constructor(message: string) {

0 commit comments

Comments
 (0)