Skip to content

Commit 470cb2f

Browse files
committed
Added a deduplicationKey to prevent acking when items are queued
1 parent 48f19f0 commit 470cb2f

File tree

1 file changed

+39
-8
lines changed

1 file changed

+39
-8
lines changed

packages/redis-worker/src/queue.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export type QueueItem<TMessageCatalog extends MessageCatalogSchema> = {
2727
visibilityTimeoutMs: number;
2828
attempt: number;
2929
timestamp: Date;
30+
deduplicationKey?: string;
3031
};
3132

3233
export type AnyQueueItem = {
@@ -36,6 +37,7 @@ export type AnyQueueItem = {
3637
visibilityTimeoutMs: number;
3738
attempt: number;
3839
timestamp: Date;
40+
deduplicationKey?: string;
3941
};
4042

4143
export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
@@ -98,11 +100,13 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
98100
}): Promise<void> {
99101
try {
100102
const score = availableAt ? availableAt.getTime() : Date.now();
103+
const deduplicationKey = nanoid();
101104
const serializedItem = JSON.stringify({
102105
job,
103106
item,
104107
visibilityTimeoutMs,
105108
attempt,
109+
deduplicationKey,
106110
});
107111

108112
const result = await this.redis.enqueueItem(
@@ -136,7 +140,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
136140
return [];
137141
}
138142

139-
const dequeuedItems = [];
143+
const dequeuedItems: Array<QueueItem<TMessageCatalog>> = [];
140144

141145
for (const [id, serializedItem, score] of results) {
142146
const parsedItem = JSON.parse(serializedItem) as any;
@@ -186,6 +190,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
186190
visibilityTimeoutMs,
187191
attempt: parsedItem.attempt ?? 0,
188192
timestamp,
193+
deduplicationKey: parsedItem.deduplicationKey,
189194
});
190195
}
191196

@@ -200,14 +205,22 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
200205
}
201206
}
202207

203-
async ack(id: string): Promise<void> {
208+
async ack(id: string, deduplicationKey?: string): Promise<void> {
204209
try {
205-
await this.redis.ackItem(`queue`, `items`, id);
210+
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? "");
211+
if (result === 0) {
212+
this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, {
213+
queue: this.name,
214+
id,
215+
deduplicationKey,
216+
});
217+
}
206218
} catch (e) {
207219
this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, {
208220
queue: this.name,
209221
error: e,
210222
id,
223+
deduplicationKey,
211224
});
212225
throw e;
213226
}
@@ -367,15 +380,32 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
367380
this.redis.defineCommand("ackItem", {
368381
numberOfKeys: 2,
369382
lua: `
370-
local queue = KEYS[1]
371-
local items = KEYS[2]
383+
local queueKey = KEYS[1]
384+
local itemsKey = KEYS[2]
372385
local id = ARGV[1]
386+
local deduplicationKey = ARGV[2]
373387
374-
redis.call('ZREM', queue, id)
375-
redis.call('HDEL', items, id)
388+
-- Get the item from the hash
389+
local item = redis.call('HGET', itemsKey, id)
390+
if not item then
391+
return -1
392+
end
376393
394+
-- Only check deduplicationKey if a non-empty one was passed in
395+
if deduplicationKey and deduplicationKey ~= "" then
396+
local success, parsed = pcall(cjson.decode, item)
397+
if success then
398+
if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then
399+
return 0
400+
end
401+
end
402+
end
403+
404+
-- Remove from sorted set and hash
405+
redis.call('ZREM', queueKey, id)
406+
redis.call('HDEL', itemsKey, id)
377407
return 1
378-
`,
408+
`,
379409
});
380410

381411
this.redis.defineCommand("moveToDeadLetterQueue", {
@@ -468,6 +498,7 @@ declare module "@internal/redis" {
468498
queue: string,
469499
items: string,
470500
id: string,
501+
deduplicationKey: string,
471502
callback?: Callback<number>
472503
): Result<number, Context>;
473504

0 commit comments

Comments
 (0)