Skip to content

Commit da02da9

Browse files
committed
Added DLQ tests
1 parent 62c8b2c commit da02da9

File tree

2 files changed

+126
-42
lines changed

2 files changed

+126
-42
lines changed

internal-packages/redis-worker/src/worker.test.ts

Lines changed: 107 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { redisTest } from "@internal/testcontainers";
2-
import { describe, it } from "node:test";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
import { describe } from "node:test";
34
import { expect } from "vitest";
45
import { z } from "zod";
56
import { Worker } from "./worker.js";
6-
import { Logger } from "@trigger.dev/core/logger";
7-
import { SimpleQueue } from "./queue.js";
87

98
describe("Worker", () => {
109
redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
@@ -35,26 +34,27 @@ describe("Worker", () => {
3534
},
3635
logger: new Logger("test", "log"),
3736
});
37+
try {
38+
// Enqueue 10 items
39+
for (let i = 0; i < 10; i++) {
40+
await worker.enqueue({
41+
id: `item-${i}`,
42+
job: "testJob",
43+
payload: { value: i },
44+
visibilityTimeoutMs: 5000,
45+
});
46+
}
3847

39-
// Enqueue 10 items
40-
for (let i = 0; i < 10; i++) {
41-
await worker.enqueue({
42-
id: `item-${i}`,
43-
job: "testJob",
44-
payload: { value: i },
45-
visibilityTimeoutMs: 5000,
46-
});
47-
}
48-
49-
worker.start();
50-
51-
// Wait for items to be processed
52-
await new Promise((resolve) => setTimeout(resolve, 600));
48+
worker.start();
5349

54-
worker.stop();
50+
// Wait for items to be processed
51+
await new Promise((resolve) => setTimeout(resolve, 600));
5552

56-
expect(processedItems.length).toBe(10);
57-
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
53+
expect(processedItems.length).toBe(10);
54+
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
55+
} finally {
56+
worker.stop();
57+
}
5858
});
5959

6060
redisTest(
@@ -97,28 +97,103 @@ describe("Worker", () => {
9797
logger: new Logger("test", "error"),
9898
});
9999

100-
// Enqueue 10 items
101-
for (let i = 0; i < 10; i++) {
100+
try {
101+
// Enqueue 10 items
102+
for (let i = 0; i < 10; i++) {
103+
await worker.enqueue({
104+
id: `item-${i}`,
105+
job: "testJob",
106+
payload: { value: i },
107+
visibilityTimeoutMs: 5000,
108+
});
109+
}
110+
111+
worker.start();
112+
113+
// Wait for items to be processed
114+
await new Promise((resolve) => setTimeout(resolve, 500));
115+
116+
expect(processedItems.length).toBe(10);
117+
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
118+
} finally {
119+
worker.stop();
120+
}
121+
}
122+
);
123+
124+
redisTest(
125+
"Process an item that permanently fails and ends up in DLQ",
126+
{ timeout: 30_000 },
127+
async ({ redisContainer }) => {
128+
const processedItems: number[] = [];
129+
const failedItemId = "permanent-fail-item";
130+
131+
const worker = new Worker({
132+
name: "test-worker",
133+
redisOptions: {
134+
host: redisContainer.getHost(),
135+
port: redisContainer.getPort(),
136+
password: redisContainer.getPassword(),
137+
},
138+
catalog: {
139+
testJob: {
140+
schema: z.object({ value: z.number() }),
141+
visibilityTimeoutMs: 1000,
142+
retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 },
143+
},
144+
},
145+
jobs: {
146+
testJob: async ({ id, payload }) => {
147+
if (id === failedItemId) {
148+
throw new Error("Permanent failure");
149+
}
150+
processedItems.push(payload.value);
151+
},
152+
},
153+
concurrency: {
154+
workers: 1,
155+
tasksPerWorker: 1,
156+
},
157+
pollIntervalMs: 50,
158+
logger: new Logger("test", "error"),
159+
});
160+
161+
try {
162+
// Enqueue the item that will permanently fail
102163
await worker.enqueue({
103-
id: `item-${i}`,
164+
id: failedItemId,
104165
job: "testJob",
105-
payload: { value: i },
106-
visibilityTimeoutMs: 5000,
166+
payload: { value: 999 },
107167
});
108-
}
109168

110-
worker.start();
169+
// Enqueue a normal item
170+
await worker.enqueue({
171+
id: "normal-item",
172+
job: "testJob",
173+
payload: { value: 1 },
174+
});
111175

112-
// Wait for items to be processed
113-
await new Promise((resolve) => setTimeout(resolve, 500));
176+
worker.start();
114177

115-
worker.stop();
178+
// Wait for items to be processed and retried
179+
await new Promise((resolve) => setTimeout(resolve, 1000));
116180

117-
expect(processedItems.length).toBe(10);
118-
expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely
181+
// Check that the normal item was processed
182+
expect(processedItems).toEqual([1]);
183+
184+
// Check that the failed item is in the DLQ
185+
const dlqSize = await worker.queue.sizeOfDeadLetterQueue();
186+
expect(dlqSize).toBe(1);
187+
} finally {
188+
worker.stop();
189+
}
119190
}
120191
);
121192

122193
//todo test that throwing an error doesn't screw up the other items
123194
//todo process more items when finished
195+
196+
//todo add a Dead Letter Queue when items are failed, with the error
197+
//todo add a function on the worker to redrive them
198+
//todo add an API endpoint to redrive with an ID
124199
});

internal-packages/redis-worker/src/worker.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type WorkerOptions<TCatalog extends WorkerCatalog> = {
4242
};
4343

4444
class Worker<TCatalog extends WorkerCatalog> {
45-
private queue: SimpleQueue<QueueCatalogFromWorkerCatalog<TCatalog>>;
45+
queue: SimpleQueue<QueueCatalogFromWorkerCatalog<TCatalog>>;
4646
private jobs: WorkerOptions<TCatalog>["jobs"];
4747
private logger: Logger;
4848
private workers: NodeWorker[] = [];
@@ -157,15 +157,19 @@ class Worker<TCatalog extends WorkerCatalog> {
157157

158158
try {
159159
await handler({ id, payload: item, visibilityTimeoutMs, attempt });
160+
161+
//succeeded, acking the item
160162
await this.queue.ack(id);
161163
} catch (error) {
164+
const errorMessage = error instanceof Error ? error.message : String(error);
162165
this.logger.error(`Error processing item, it threw an error:`, {
163166
name: this.options.name,
164167
id,
165168
job,
166169
item,
167170
visibilityTimeoutMs,
168171
error,
172+
errorMessage,
169173
});
170174
// Requeue the failed item with a delay
171175
try {
@@ -174,15 +178,20 @@ class Worker<TCatalog extends WorkerCatalog> {
174178
const retryDelay = calculateNextRetryDelay(catalogItem.retry, attempt);
175179

176180
if (!retryDelay) {
177-
this.logger.error(`Failed item ${id} has reached max attempts, acking.`, {
178-
name: this.options.name,
179-
id,
180-
job,
181-
item,
182-
visibilityTimeoutMs,
183-
attempt,
184-
});
185-
await this.queue.ack(id);
181+
this.logger.error(
182+
`Failed item ${id} has reached max attempts, moving to the DLQ.`,
183+
{
184+
name: this.options.name,
185+
id,
186+
job,
187+
item,
188+
visibilityTimeoutMs,
189+
attempt,
190+
errorMessage,
191+
}
192+
);
193+
194+
await this.queue.moveToDeadLetterQueue(id, errorMessage);
186195
return;
187196
}
188197

0 commit comments

Comments
 (0)