Skip to content

Commit aa84a69

Browse files
committed
DLQ tests
1 parent 0c25754 commit aa84a69

File tree

1 file changed

+110
-55
lines changed

1 file changed

+110
-55
lines changed

internal-packages/redis-worker/src/queue.test.ts

Lines changed: 110 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -185,68 +185,123 @@ describe("SimpleQueue", () => {
185185
await queue.close();
186186
}
187187
});
188-
});
189188

190-
redisTest("dequeue multiple items", { timeout: 20_000 }, async ({ redisContainer }) => {
191-
const queue = new SimpleQueue({
192-
name: "test-1",
193-
schema: {
194-
test: z.object({
195-
value: z.number(),
196-
}),
197-
},
198-
redisOptions: {
199-
host: redisContainer.getHost(),
200-
port: redisContainer.getPort(),
201-
password: redisContainer.getPassword(),
202-
},
203-
logger: new Logger("test", "log"),
189+
redisTest("dequeue multiple items", { timeout: 20_000 }, async ({ redisContainer }) => {
190+
const queue = new SimpleQueue({
191+
name: "test-1",
192+
schema: {
193+
test: z.object({
194+
value: z.number(),
195+
}),
196+
},
197+
redisOptions: {
198+
host: redisContainer.getHost(),
199+
port: redisContainer.getPort(),
200+
password: redisContainer.getPassword(),
201+
},
202+
logger: new Logger("test", "log"),
203+
});
204+
205+
try {
206+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
207+
await queue.enqueue({ id: "2", job: "test", item: { value: 2 }, visibilityTimeoutMs: 2000 });
208+
await queue.enqueue({ id: "3", job: "test", item: { value: 3 }, visibilityTimeoutMs: 2000 });
209+
210+
expect(await queue.size()).toBe(3);
211+
212+
const dequeued = await queue.dequeue(2);
213+
expect(dequeued).toHaveLength(2);
214+
expect(dequeued[0]).toEqual({
215+
id: "1",
216+
job: "test",
217+
item: { value: 1 },
218+
visibilityTimeoutMs: 2000,
219+
attempt: 0,
220+
});
221+
expect(dequeued[1]).toEqual({
222+
id: "2",
223+
job: "test",
224+
item: { value: 2 },
225+
visibilityTimeoutMs: 2000,
226+
attempt: 0,
227+
});
228+
229+
expect(await queue.size()).toBe(1);
230+
expect(await queue.size({ includeFuture: true })).toBe(3);
231+
232+
await queue.ack(dequeued[0].id);
233+
await queue.ack(dequeued[1].id);
234+
235+
expect(await queue.size({ includeFuture: true })).toBe(1);
236+
237+
const [last] = await queue.dequeue(1);
238+
expect(last).toEqual({
239+
id: "3",
240+
job: "test",
241+
item: { value: 3 },
242+
visibilityTimeoutMs: 2000,
243+
attempt: 0,
244+
});
245+
246+
await queue.ack(last.id);
247+
expect(await queue.size({ includeFuture: true })).toBe(0);
248+
} finally {
249+
await queue.close();
250+
}
204251
});
205252

206-
try {
207-
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
208-
await queue.enqueue({ id: "2", job: "test", item: { value: 2 }, visibilityTimeoutMs: 2000 });
209-
await queue.enqueue({ id: "3", job: "test", item: { value: 3 }, visibilityTimeoutMs: 2000 });
210-
211-
expect(await queue.size()).toBe(3);
212-
213-
const dequeued = await queue.dequeue(2);
214-
expect(dequeued).toHaveLength(2);
215-
expect(dequeued[0]).toEqual({
216-
id: "1",
217-
job: "test",
218-
item: { value: 1 },
219-
visibilityTimeoutMs: 2000,
220-
attempt: 0,
221-
});
222-
expect(dequeued[1]).toEqual({
223-
id: "2",
224-
job: "test",
225-
item: { value: 2 },
226-
visibilityTimeoutMs: 2000,
227-
attempt: 0,
253+
redisTest("Dead Letter Queue", { timeout: 20_000 }, async ({ redisContainer }) => {
254+
const queue = new SimpleQueue({
255+
name: "test-dlq",
256+
schema: {
257+
test: z.object({
258+
value: z.number(),
259+
}),
260+
},
261+
redisOptions: {
262+
host: redisContainer.getHost(),
263+
port: redisContainer.getPort(),
264+
password: redisContainer.getPassword(),
265+
},
266+
logger: new Logger("test", "log"),
228267
});
229268

230-
expect(await queue.size()).toBe(1);
231-
expect(await queue.size({ includeFuture: true })).toBe(3);
269+
try {
270+
// Enqueue an item
271+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
272+
expect(await queue.size()).toBe(1);
273+
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
232274

233-
await queue.ack(dequeued[0].id);
234-
await queue.ack(dequeued[1].id);
275+
// Move item to DLQ
276+
await queue.moveToDeadLetterQueue("1", "Test error message");
277+
expect(await queue.size()).toBe(0);
278+
expect(await queue.sizeOfDeadLetterQueue()).toBe(1);
235279

236-
expect(await queue.size({ includeFuture: true })).toBe(1);
280+
// Attempt to dequeue from the main queue should return empty
281+
const dequeued = await queue.dequeue(1);
282+
expect(dequeued).toEqual([]);
237283

238-
const [last] = await queue.dequeue(1);
239-
expect(last).toEqual({
240-
id: "3",
241-
job: "test",
242-
item: { value: 3 },
243-
visibilityTimeoutMs: 2000,
244-
attempt: 0,
245-
});
284+
// Redrive item from DLQ
285+
await queue.redriveFromDeadLetterQueue("1");
286+
expect(await queue.size()).toBe(1);
287+
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
246288

247-
await queue.ack(last.id);
248-
expect(await queue.size({ includeFuture: true })).toBe(0);
249-
} finally {
250-
await queue.close();
251-
}
289+
// Dequeue the redriven item
290+
const [redrivenItem] = await queue.dequeue(1);
291+
expect(redrivenItem).toEqual({
292+
id: "1",
293+
job: "test",
294+
item: { value: 1 },
295+
visibilityTimeoutMs: 2000,
296+
attempt: 0,
297+
});
298+
299+
// Acknowledge the item
300+
await queue.ack(redrivenItem.id);
301+
expect(await queue.size()).toBe(0);
302+
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
303+
} finally {
304+
await queue.close();
305+
}
306+
});
252307
});

0 commit comments

Comments
 (0)