Skip to content

Commit 9e2b9e0

Browse files
committed
Worker setup and processing items in a simple test
1 parent fff3a35 commit 9e2b9e0

File tree

5 files changed

+129
-297
lines changed

5 files changed

+129
-297
lines changed

internal-packages/redis-worker/src/processor.test.ts

Lines changed: 0 additions & 91 deletions
This file was deleted.

internal-packages/redis-worker/src/processor.ts

Lines changed: 0 additions & 185 deletions
This file was deleted.

internal-packages/redis-worker/src/queue.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
2929
schema: TMessageCatalog;
3030
redisOptions: RedisOptions;
3131
logger?: Logger;
32-
shutdownTimeMs?: number;
3332
}) {
3433
this.name = name;
3534
this.redis = new Redis({
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { redisTest } from "@internal/testcontainers";
2+
import { describe, it } from "node:test";
3+
import { expect } from "vitest";
4+
import { z } from "zod";
5+
import { Worker } from "./worker.js";
6+
import { Logger } from "@trigger.dev/core/logger";
7+
import { SimpleQueue } from "./queue.js";
8+
9+
describe("Worker", () => {
10+
// Tests will be added here
11+
});
12+
13+
redisTest("concurrency settings", { timeout: 30_000 }, async ({ redisContainer }) => {
14+
const processedItems: number[] = [];
15+
const worker = new Worker({
16+
name: "test-worker",
17+
redisOptions: {
18+
host: redisContainer.getHost(),
19+
port: redisContainer.getPort(),
20+
password: redisContainer.getPassword(),
21+
},
22+
catalog: {
23+
testJob: {
24+
schema: z.object({ value: z.number() }),
25+
visibilityTimeoutMs: 5000,
26+
retry: { maxAttempts: 3 },
27+
},
28+
},
29+
jobs: {
30+
testJob: async ({ payload }) => {
31+
await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work
32+
processedItems.push(payload.value);
33+
},
34+
},
35+
concurrency: {
36+
workers: 2,
37+
tasksPerWorker: 3,
38+
},
39+
logger: new Logger("test", "log"),
40+
});
41+
42+
// Enqueue 10 items
43+
for (let i = 0; i < 10; i++) {
44+
await worker.enqueue({
45+
id: `item-${i}`,
46+
job: "testJob",
47+
payload: { value: i },
48+
visibilityTimeoutMs: 5000,
49+
});
50+
}
51+
52+
worker.start();
53+
54+
// Wait for items to be processed
55+
await new Promise((resolve) => setTimeout(resolve, 600));
56+
57+
worker.stop();
58+
59+
expect(processedItems.length).toBe(10);
60+
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
61+
});

0 commit comments

Comments
 (0)