Skip to content

Commit 0dd60ea

Browse files
committed
Add queue and locked version tests
1 parent 2b1d9d9 commit 0dd60ea

File tree

1 file changed

+274
-1
lines changed

1 file changed

+274
-1
lines changed

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 274 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
ValidationResult,
2121
} from "~/runEngine/types";
2222
import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server";
23-
import { TaskRun } from "@trigger.dev/database";
23+
import { TaskRun, TaskRunStatus } from "@trigger.dev/database";
2424

2525
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
2626

@@ -183,4 +183,277 @@ describe("RunEngineTriggerTaskService", () => {
183183

184184
await engine.quit();
185185
});
186+
187+
containerTest("should handle idempotency keys correctly", async ({ prisma, redisOptions }) => {
188+
const engine = new RunEngine({
189+
prisma,
190+
worker: {
191+
redis: redisOptions,
192+
workers: 1,
193+
tasksPerWorker: 10,
194+
pollIntervalMs: 100,
195+
},
196+
queue: {
197+
redis: redisOptions,
198+
},
199+
runLock: {
200+
redis: redisOptions,
201+
},
202+
machines: {
203+
defaultMachine: "small-1x",
204+
machines: {
205+
"small-1x": {
206+
name: "small-1x" as const,
207+
cpu: 0.5,
208+
memory: 0.5,
209+
centsPerMs: 0.0001,
210+
},
211+
},
212+
baseCostInCents: 0.0005,
213+
},
214+
tracer: trace.getTracer("test", "0.0.0"),
215+
});
216+
217+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
218+
219+
const taskIdentifier = "test-task";
220+
221+
//create background worker
222+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
223+
224+
const queuesManager = new DefaultQueueManager(prisma, engine);
225+
226+
const idempotencyKeyConcern = new IdempotencyKeyConcern(
227+
prisma,
228+
engine,
229+
new MockTraceEventConcern()
230+
);
231+
232+
const triggerTaskService = new RunEngineTriggerTaskService({
233+
engine,
234+
prisma,
235+
runNumberIncrementer: new MockRunNumberIncrementer(),
236+
payloadProcessor: new MockPayloadProcessor(),
237+
queueConcern: queuesManager,
238+
idempotencyKeyConcern,
239+
validator: new MockTriggerTaskValidator(),
240+
traceEventConcern: new MockTraceEventConcern(),
241+
tracer: trace.getTracer("test", "0.0.0"),
242+
});
243+
244+
const result = await triggerTaskService.call({
245+
taskId: taskIdentifier,
246+
environment: authenticatedEnvironment,
247+
body: {
248+
payload: { test: "test" },
249+
options: {
250+
idempotencyKey: "test-idempotency-key",
251+
},
252+
},
253+
});
254+
255+
expect(result).toBeDefined();
256+
expect(result?.run.friendlyId).toBeDefined();
257+
expect(result?.run.status).toBe("PENDING");
258+
expect(result?.isCached).toBe(false);
259+
260+
const run = await prisma.taskRun.findUnique({
261+
where: {
262+
id: result?.run.id,
263+
},
264+
});
265+
266+
expect(run).toBeDefined();
267+
expect(run?.friendlyId).toBe(result?.run.friendlyId);
268+
expect(run?.engine).toBe("V2");
269+
expect(run?.queuedAt).toBeDefined();
270+
expect(run?.queue).toBe(`task/${taskIdentifier}`);
271+
272+
// Lets make sure the task is in the queue
273+
const queueLength = await engine.runQueue.lengthOfQueue(
274+
authenticatedEnvironment,
275+
`task/${taskIdentifier}`
276+
);
277+
expect(queueLength).toBe(1);
278+
279+
// Now lets try to trigger the same task with the same idempotency key
280+
const cachedResult = await triggerTaskService.call({
281+
taskId: taskIdentifier,
282+
environment: authenticatedEnvironment,
283+
body: {
284+
payload: { test: "test" },
285+
options: {
286+
idempotencyKey: "test-idempotency-key",
287+
},
288+
},
289+
});
290+
291+
expect(cachedResult).toBeDefined();
292+
expect(cachedResult?.run.friendlyId).toBe(result?.run.friendlyId);
293+
expect(cachedResult?.isCached).toBe(true);
294+
295+
await engine.quit();
296+
});
297+
298+
containerTest(
299+
"should resolve queue names correctly when locked to version",
300+
async ({ prisma, redisOptions }) => {
301+
const engine = new RunEngine({
302+
prisma,
303+
worker: {
304+
redis: redisOptions,
305+
workers: 1,
306+
tasksPerWorker: 10,
307+
pollIntervalMs: 100,
308+
},
309+
queue: {
310+
redis: redisOptions,
311+
},
312+
runLock: {
313+
redis: redisOptions,
314+
},
315+
machines: {
316+
defaultMachine: "small-1x",
317+
machines: {
318+
"small-1x": {
319+
name: "small-1x" as const,
320+
cpu: 0.5,
321+
memory: 0.5,
322+
centsPerMs: 0.0001,
323+
},
324+
},
325+
baseCostInCents: 0.0005,
326+
},
327+
tracer: trace.getTracer("test", "0.0.0"),
328+
});
329+
330+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
331+
const taskIdentifier = "test-task";
332+
333+
// Create a background worker with a specific version
334+
const worker = await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier, {
335+
preset: "small-1x",
336+
});
337+
338+
// Create a specific queue for this worker
339+
const specificQueue = await prisma.taskQueue.create({
340+
data: {
341+
name: "specific-queue",
342+
friendlyId: "specific-queue",
343+
projectId: authenticatedEnvironment.projectId,
344+
runtimeEnvironmentId: authenticatedEnvironment.id,
345+
workers: {
346+
connect: {
347+
id: worker.worker.id,
348+
},
349+
},
350+
},
351+
});
352+
353+
// Associate the task with the queue
354+
await prisma.backgroundWorkerTask.update({
355+
where: {
356+
workerId_slug: {
357+
workerId: worker.worker.id,
358+
slug: taskIdentifier,
359+
},
360+
},
361+
data: {
362+
queueId: specificQueue.id,
363+
},
364+
});
365+
366+
const queuesManager = new DefaultQueueManager(prisma, engine);
367+
const idempotencyKeyConcern = new IdempotencyKeyConcern(
368+
prisma,
369+
engine,
370+
new MockTraceEventConcern()
371+
);
372+
373+
const triggerTaskService = new RunEngineTriggerTaskService({
374+
engine,
375+
prisma,
376+
runNumberIncrementer: new MockRunNumberIncrementer(),
377+
payloadProcessor: new MockPayloadProcessor(),
378+
queueConcern: queuesManager,
379+
idempotencyKeyConcern,
380+
validator: new MockTriggerTaskValidator(),
381+
traceEventConcern: new MockTraceEventConcern(),
382+
tracer: trace.getTracer("test", "0.0.0"),
383+
});
384+
385+
// Test case 1: Trigger with lockToVersion but no specific queue
386+
const result1 = await triggerTaskService.call({
387+
taskId: taskIdentifier,
388+
environment: authenticatedEnvironment,
389+
body: {
390+
payload: { test: "test" },
391+
options: {
392+
lockToVersion: worker.worker.version,
393+
},
394+
},
395+
});
396+
397+
expect(result1).toBeDefined();
398+
expect(result1?.run.queue).toBe("specific-queue");
399+
400+
// Test case 2: Trigger with lockToVersion and specific queue
401+
const result2 = await triggerTaskService.call({
402+
taskId: taskIdentifier,
403+
environment: authenticatedEnvironment,
404+
body: {
405+
payload: { test: "test" },
406+
options: {
407+
lockToVersion: worker.worker.version,
408+
queue: {
409+
name: "specific-queue",
410+
},
411+
},
412+
},
413+
});
414+
415+
expect(result2).toBeDefined();
416+
expect(result2?.run.queue).toBe("specific-queue");
417+
expect(result2?.run.lockedQueueId).toBe(specificQueue.id);
418+
419+
// Test case 3: Try to use non-existent queue with locked version (should throw)
420+
await expect(
421+
triggerTaskService.call({
422+
taskId: taskIdentifier,
423+
environment: authenticatedEnvironment,
424+
body: {
425+
payload: { test: "test" },
426+
options: {
427+
lockToVersion: worker.worker.version,
428+
queue: {
429+
name: "non-existent-queue",
430+
},
431+
},
432+
},
433+
})
434+
).rejects.toThrow(
435+
`Specified queue 'non-existent-queue' not found or not associated with locked version '${worker.worker.version}'`
436+
);
437+
438+
// Test case 4: Trigger with a non-existent queue without a locked version
439+
const result4 = await triggerTaskService.call({
440+
taskId: taskIdentifier,
441+
environment: authenticatedEnvironment,
442+
body: {
443+
payload: { test: "test" },
444+
options: {
445+
queue: {
446+
name: "non-existent-queue",
447+
},
448+
},
449+
},
450+
});
451+
452+
expect(result4).toBeDefined();
453+
expect(result4?.run.queue).toBe("non-existent-queue");
454+
expect(result4?.run.status).toBe("PENDING");
455+
456+
await engine.quit();
457+
}
458+
);
186459
});

0 commit comments

Comments
 (0)