Skip to content

Commit 672a6b8

Browse files
authored
RedisWorker clean up orphaned items when dequeuing (#1975)
* Remove setting the invisibility timeout because it’s already done in the dequeue Lua script * Remove orphaned queue items when dequeuing * Added an ack to the visibility timeout test
1 parent 56ec052 commit 672a6b8

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

packages/redis-worker/src/queue.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { expect } from "vitest";
44
import { z } from "zod";
55
import { SimpleQueue } from "./queue.js";
66
import { Logger } from "@trigger.dev/core/logger";
7+
import { createRedisClient } from "@internal/redis";
78

89
describe("SimpleQueue", () => {
910
redisTest("enqueue/dequeue", { timeout: 20_000 }, async ({ redisContainer }) => {
@@ -209,6 +210,10 @@ describe("SimpleQueue", () => {
209210
timestamp: expect.any(Date),
210211
})
211212
);
213+
214+
// Acknowledge the item and verify it's removed
215+
await queue.ack(second!.id);
216+
expect(await queue.size({ includeFuture: true })).toBe(0);
212217
} finally {
213218
await queue.close();
214219
}
@@ -328,6 +333,7 @@ describe("SimpleQueue", () => {
328333

329334
// Redrive item from DLQ
330335
await queue.redriveFromDeadLetterQueue("1");
336+
await new Promise((resolve) => setTimeout(resolve, 200));
331337
expect(await queue.size()).toBe(1);
332338
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
333339

@@ -357,4 +363,64 @@ describe("SimpleQueue", () => {
357363
await queue.close();
358364
}
359365
});
366+
367+
redisTest("cleanup orphaned queue entries", { timeout: 20_000 }, async ({ redisContainer }) => {
368+
const queue = new SimpleQueue({
369+
name: "test-orphaned",
370+
schema: {
371+
test: z.object({
372+
value: z.number(),
373+
}),
374+
},
375+
redisOptions: {
376+
host: redisContainer.getHost(),
377+
port: redisContainer.getPort(),
378+
password: redisContainer.getPassword(),
379+
},
380+
logger: new Logger("test", "log"),
381+
});
382+
383+
try {
384+
// First, add a normal item
385+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
386+
387+
const redisClient = createRedisClient({
388+
host: redisContainer.getHost(),
389+
port: redisContainer.getPort(),
390+
password: redisContainer.getPassword(),
391+
});
392+
393+
// Manually add an orphaned item to the queue (without corresponding hash entry)
394+
await redisClient.zadd(`{queue:test-orphaned:}queue`, Date.now(), "orphaned-id");
395+
396+
// Verify both items are in the queue
397+
expect(await queue.size()).toBe(2);
398+
399+
// Dequeue should process both items, but only return the valid one
400+
// and clean up the orphaned entry
401+
const dequeued = await queue.dequeue(2);
402+
403+
// Should only get the valid item
404+
expect(dequeued).toHaveLength(1);
405+
expect(dequeued[0]).toEqual(
406+
expect.objectContaining({
407+
id: "1",
408+
job: "test",
409+
item: { value: 1 },
410+
visibilityTimeoutMs: 2000,
411+
attempt: 0,
412+
timestamp: expect.any(Date),
413+
})
414+
);
415+
416+
// The orphaned item should have been removed
417+
expect(await queue.size({ includeFuture: true })).toBe(1);
418+
419+
// Verify the orphaned ID is no longer in the queue
420+
const orphanedScore = await redisClient.zscore(`{queue:test-orphaned:}queue`, "orphaned-id");
421+
expect(orphanedScore).toBeNull();
422+
} finally {
423+
await queue.close();
424+
}
425+
});
360426
});

packages/redis-worker/src/queue.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
130130
throw e;
131131
}
132132
}
133+
133134
async dequeue(count: number = 1): Promise<Array<QueueItem<TMessageCatalog>>> {
134135
const now = Date.now();
135136

@@ -179,9 +180,6 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
179180
}
180181

181182
const visibilityTimeoutMs = parsedItem.visibilityTimeoutMs as number;
182-
const invisibleUntil = now + visibilityTimeoutMs;
183-
184-
await this.redis.zadd(`queue`, invisibleUntil, id);
185183

186184
dequeuedItems.push({
187185
id,
@@ -374,6 +372,9 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
374372
375373
redis.call('ZADD', queue, invisibleUntil, id)
376374
table.insert(dequeued, {id, serializedItem, score})
375+
else
376+
-- Remove the orphaned queue entry if no corresponding item exists
377+
redis.call('ZREM', queue, id)
377378
end
378379
end
379380

0 commit comments

Comments
 (0)