Skip to content

Commit b8d7380

Browse files
committed
Process jobs in parallel with retrying
1 parent 9e2b9e0 commit b8d7380

File tree

2 files changed

+175
-68
lines changed

2 files changed

+175
-68
lines changed

internal-packages/redis-worker/src/worker.test.ts

Lines changed: 108 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,55 +7,120 @@ import { Logger } from "@trigger.dev/core/logger";
77
import { SimpleQueue } from "./queue.js";
88

99
describe("Worker", () => {
10-
// Tests will be added here
11-
});
12-
13-
redisTest("concurrency settings", { timeout: 30_000 }, async ({ redisContainer }) => {
14-
const processedItems: number[] = [];
15-
const worker = new Worker({
16-
name: "test-worker",
17-
redisOptions: {
18-
host: redisContainer.getHost(),
19-
port: redisContainer.getPort(),
20-
password: redisContainer.getPassword(),
21-
},
22-
catalog: {
23-
testJob: {
24-
schema: z.object({ value: z.number() }),
25-
visibilityTimeoutMs: 5000,
26-
retry: { maxAttempts: 3 },
10+
redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
11+
const processedItems: number[] = [];
12+
const worker = new Worker({
13+
name: "test-worker",
14+
redisOptions: {
15+
host: redisContainer.getHost(),
16+
port: redisContainer.getPort(),
17+
password: redisContainer.getPassword(),
18+
},
19+
catalog: {
20+
testJob: {
21+
schema: z.object({ value: z.number() }),
22+
visibilityTimeoutMs: 5000,
23+
retry: { maxAttempts: 3 },
24+
},
25+
},
26+
jobs: {
27+
testJob: async ({ payload }) => {
28+
await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work
29+
processedItems.push(payload.value);
30+
},
2731
},
28-
},
29-
jobs: {
30-
testJob: async ({ payload }) => {
31-
await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work
32-
processedItems.push(payload.value);
32+
concurrency: {
33+
workers: 2,
34+
tasksPerWorker: 3,
3335
},
34-
},
35-
concurrency: {
36-
workers: 2,
37-
tasksPerWorker: 3,
38-
},
39-
logger: new Logger("test", "log"),
36+
logger: new Logger("test", "log"),
37+
});
38+
39+
// Enqueue 10 items
40+
for (let i = 0; i < 10; i++) {
41+
await worker.enqueue({
42+
id: `item-${i}`,
43+
job: "testJob",
44+
payload: { value: i },
45+
visibilityTimeoutMs: 5000,
46+
});
47+
}
48+
49+
worker.start();
50+
51+
// Wait for items to be processed
52+
await new Promise((resolve) => setTimeout(resolve, 600));
53+
54+
worker.stop();
55+
56+
expect(processedItems.length).toBe(10);
57+
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
4058
});
4159

42-
// Enqueue 10 items
43-
for (let i = 0; i < 10; i++) {
44-
await worker.enqueue({
45-
id: `item-${i}`,
46-
job: "testJob",
47-
payload: { value: i },
48-
visibilityTimeoutMs: 5000,
49-
});
50-
}
60+
redisTest(
61+
"Process items that throw an error",
62+
{ timeout: 30_000 },
63+
async ({ redisContainer }) => {
64+
const processedItems: number[] = [];
65+
const hadAttempt = new Set<string>();
5166

52-
worker.start();
67+
const worker = new Worker({
68+
name: "test-worker",
69+
redisOptions: {
70+
host: redisContainer.getHost(),
71+
port: redisContainer.getPort(),
72+
password: redisContainer.getPassword(),
73+
},
74+
catalog: {
75+
testJob: {
76+
schema: z.object({ value: z.number() }),
77+
visibilityTimeoutMs: 5000,
78+
retry: { maxAttempts: 3, minDelayMs: 10 },
79+
},
80+
},
81+
jobs: {
82+
testJob: async ({ id, payload }) => {
83+
if (!hadAttempt.has(id)) {
84+
hadAttempt.add(id);
85+
throw new Error("Test error");
86+
}
5387

54-
// Wait for items to be processed
55-
await new Promise((resolve) => setTimeout(resolve, 600));
88+
await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work
89+
processedItems.push(payload.value);
90+
},
91+
},
92+
concurrency: {
93+
workers: 2,
94+
tasksPerWorker: 3,
95+
},
96+
pollIntervalMs: 50,
97+
logger: new Logger("test", "error"),
98+
});
5699

57-
worker.stop();
100+
// Enqueue 10 items
101+
for (let i = 0; i < 10; i++) {
102+
await worker.enqueue({
103+
id: `item-${i}`,
104+
job: "testJob",
105+
payload: { value: i },
106+
visibilityTimeoutMs: 5000,
107+
});
108+
}
58109

59-
expect(processedItems.length).toBe(10);
60-
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
110+
worker.start();
111+
112+
// Wait for items to be processed
113+
await new Promise((resolve) => setTimeout(resolve, 500));
114+
115+
worker.stop();
116+
117+
expect(processedItems.length).toBe(10);
118+
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
119+
}
120+
);
61121
});
122+
123+
//todo test throwing an error and that retrying works
124+
//todo test that throwing an error doesn't screw up the other items
125+
//todo change the processItems to be in parallel using Promise.allResolved
126+
//process more items when finished

internal-packages/redis-worker/src/worker.ts

Lines changed: 67 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type WorkerOptions<TCatalog extends WorkerCatalog> = {
3838
workers?: number;
3939
tasksPerWorker?: number;
4040
};
41+
pollIntervalMs?: number;
4142
logger?: Logger;
4243
};
4344

@@ -48,14 +49,12 @@ class Worker<TCatalog extends WorkerCatalog> {
4849
private workers: NodeWorker[] = [];
4950
private isShuttingDown = false;
5051
private concurrency: Required<NonNullable<WorkerOptions<TCatalog>["concurrency"]>>;
51-
private catalog: TCatalog;
5252

53-
constructor(options: WorkerOptions<TCatalog>) {
54-
this.catalog = options.catalog;
53+
constructor(private options: WorkerOptions<TCatalog>) {
5554
this.logger = options.logger ?? new Logger("Worker", "debug");
5655

5756
const schema: QueueCatalogFromWorkerCatalog<TCatalog> = Object.fromEntries(
58-
Object.entries(options.catalog).map(([key, value]) => [key, value.schema])
57+
Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema])
5958
) as QueueCatalogFromWorkerCatalog<TCatalog>;
6059

6160
this.queue = new SimpleQueue({
@@ -89,7 +88,7 @@ class Worker<TCatalog extends WorkerCatalog> {
8988
payload: z.infer<TCatalog[K]["schema"]>;
9089
visibilityTimeoutMs?: number;
9190
}) {
92-
const timeout = visibilityTimeoutMs ?? this.catalog[job].visibilityTimeoutMs;
91+
const timeout = visibilityTimeoutMs ?? this.options.catalog[job].visibilityTimeoutMs;
9392
return this.queue.enqueue({
9493
id,
9594
job,
@@ -139,34 +138,77 @@ class Worker<TCatalog extends WorkerCatalog> {
139138
private async processItems(worker: NodeWorker, count: number) {
140139
if (this.isShuttingDown) return;
141140

141+
const pollIntervalMs = this.options.pollIntervalMs ?? 1000;
142+
142143
try {
143144
const items = await this.queue.dequeue(count);
144145
if (items.length === 0) {
145-
setTimeout(() => this.processItems(worker, count), 1000); // Wait before trying again
146+
setTimeout(() => this.processItems(worker, count), pollIntervalMs);
146147
return;
147148
}
148149

149-
worker.postMessage({ type: "process", items });
150-
151-
for (const { id, job, item, visibilityTimeoutMs } of items) {
152-
const handler = this.jobs[job as any];
153-
if (!handler) {
154-
this.logger.error(`No handler found for job type: ${job as string}`);
155-
continue;
156-
}
157-
158-
try {
159-
await handler({ id, payload: item, visibilityTimeoutMs });
160-
await this.queue.ack(id);
161-
} catch (error) {
162-
this.logger.error(`Error processing item ${id}:`, { error });
163-
// Here you might want to implement a retry mechanism or dead-letter queue
164-
}
165-
}
150+
await Promise.all(
151+
items.map(async ({ id, job, item, visibilityTimeoutMs }) => {
152+
const catalogItem = this.options.catalog[job as any];
153+
const handler = this.jobs[job as any];
154+
if (!handler) {
155+
this.logger.error(`No handler found for job type: ${job as string}`);
156+
return;
157+
}
158+
159+
try {
160+
await handler({ id, payload: item, visibilityTimeoutMs });
161+
await this.queue.ack(id);
162+
} catch (error) {
163+
this.logger.error(`Error processing item, it threw an error:`, {
164+
name: this.options.name,
165+
id,
166+
job,
167+
item,
168+
visibilityTimeoutMs,
169+
error,
170+
});
171+
// Requeue the failed item with a delay
172+
try {
173+
const retryDelay = catalogItem.retry.minDelayMs ?? 1_000;
174+
const retryDate = new Date(Date.now() + retryDelay);
175+
this.logger.info(`Requeued failed item ${id} with delay`, {
176+
name: this.options.name,
177+
id,
178+
job,
179+
item,
180+
retryDate,
181+
retryDelay,
182+
visibilityTimeoutMs,
183+
});
184+
await this.queue.enqueue({
185+
id,
186+
job,
187+
item,
188+
availableAt: retryDate,
189+
visibilityTimeoutMs,
190+
});
191+
} catch (requeueError) {
192+
this.logger.error(`Failed to requeue item, threw error:`, {
193+
name: this.options.name,
194+
id,
195+
job,
196+
item,
197+
visibilityTimeoutMs,
198+
error: requeueError,
199+
});
200+
}
201+
}
202+
})
203+
);
166204
} catch (error) {
167-
this.logger.error("Error dequeuing items:", { error });
168-
setTimeout(() => this.processItems(worker, count), 1000); // Wait before trying again
205+
this.logger.error("Error dequeuing items:", { name: this.options.name, error });
206+
setTimeout(() => this.processItems(worker, count), pollIntervalMs);
207+
return;
169208
}
209+
210+
// Immediately process next batch because there were items in the queue
211+
this.processItems(worker, count);
170212
}
171213

172214
private setupShutdownHandlers() {

0 commit comments

Comments
 (0)