Skip to content

Commit b43043a

Browse files
committed
Remove orphaned queue items when dequeuing
1 parent 65cc779 commit b43043a

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

packages/redis-worker/src/queue.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { expect } from "vitest";
44
import { z } from "zod";
55
import { SimpleQueue } from "./queue.js";
66
import { Logger } from "@trigger.dev/core/logger";
7+
import { createRedisClient } from "@internal/redis";
78

89
describe("SimpleQueue", () => {
910
redisTest("enqueue/dequeue", { timeout: 20_000 }, async ({ redisContainer }) => {
@@ -328,6 +329,7 @@ describe("SimpleQueue", () => {
328329

329330
// Redrive item from DLQ
330331
await queue.redriveFromDeadLetterQueue("1");
332+
await new Promise((resolve) => setTimeout(resolve, 200));
331333
expect(await queue.size()).toBe(1);
332334
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
333335

@@ -357,4 +359,64 @@ describe("SimpleQueue", () => {
357359
await queue.close();
358360
}
359361
});
362+
363+
redisTest("cleanup orphaned queue entries", { timeout: 20_000 }, async ({ redisContainer }) => {
364+
const queue = new SimpleQueue({
365+
name: "test-orphaned",
366+
schema: {
367+
test: z.object({
368+
value: z.number(),
369+
}),
370+
},
371+
redisOptions: {
372+
host: redisContainer.getHost(),
373+
port: redisContainer.getPort(),
374+
password: redisContainer.getPassword(),
375+
},
376+
logger: new Logger("test", "log"),
377+
});
378+
379+
try {
380+
// First, add a normal item
381+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
382+
383+
const redisClient = createRedisClient({
384+
host: redisContainer.getHost(),
385+
port: redisContainer.getPort(),
386+
password: redisContainer.getPassword(),
387+
});
388+
389+
// Manually add an orphaned item to the queue (without corresponding hash entry)
390+
await redisClient.zadd(`{queue:test-orphaned:}queue`, Date.now(), "orphaned-id");
391+
392+
// Verify both items are in the queue
393+
expect(await queue.size()).toBe(2);
394+
395+
// Dequeue should process both items, but only return the valid one
396+
// and clean up the orphaned entry
397+
const dequeued = await queue.dequeue(2);
398+
399+
// Should only get the valid item
400+
expect(dequeued).toHaveLength(1);
401+
expect(dequeued[0]).toEqual(
402+
expect.objectContaining({
403+
id: "1",
404+
job: "test",
405+
item: { value: 1 },
406+
visibilityTimeoutMs: 2000,
407+
attempt: 0,
408+
timestamp: expect.any(Date),
409+
})
410+
);
411+
412+
// The orphaned item should have been removed
413+
expect(await queue.size({ includeFuture: true })).toBe(1);
414+
415+
// Verify the orphaned ID is no longer in the queue
416+
const orphanedScore = await redisClient.zscore(`{queue:test-orphaned:}queue`, "orphaned-id");
417+
expect(orphanedScore).toBeNull();
418+
} finally {
419+
await queue.close();
420+
}
421+
});
360422
});

packages/redis-worker/src/queue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
372372
373373
redis.call('ZADD', queue, invisibleUntil, id)
374374
table.insert(dequeued, {id, serializedItem, score})
375+
else
376+
-- Remove the orphaned queue entry if no corresponding item exists
377+
redis.call('ZREM', queue, id)
375378
end
376379
end
377380

0 commit comments

Comments
 (0)