-
-
Notifications
You must be signed in to change notification settings - Fork 742
Fix redis worker debounce #1966
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
3c753ec
48f19f0
470cb2f
72cbabe
be02fe6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ export type QueueItem<TMessageCatalog extends MessageCatalogSchema> = { | |
visibilityTimeoutMs: number; | ||
attempt: number; | ||
timestamp: Date; | ||
deduplicationKey?: string; | ||
}; | ||
|
||
export type AnyQueueItem = { | ||
|
@@ -36,6 +37,7 @@ export type AnyQueueItem = { | |
visibilityTimeoutMs: number; | ||
attempt: number; | ||
timestamp: Date; | ||
deduplicationKey?: string; | ||
}; | ||
|
||
export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | ||
|
@@ -98,11 +100,13 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | |
}): Promise<void> { | ||
try { | ||
const score = availableAt ? availableAt.getTime() : Date.now(); | ||
const deduplicationKey = nanoid(); | ||
const serializedItem = JSON.stringify({ | ||
job, | ||
item, | ||
visibilityTimeoutMs, | ||
attempt, | ||
deduplicationKey, | ||
}); | ||
|
||
const result = await this.redis.enqueueItem( | ||
|
@@ -136,7 +140,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | |
return []; | ||
} | ||
|
||
const dequeuedItems = []; | ||
const dequeuedItems: Array<QueueItem<TMessageCatalog>> = []; | ||
|
||
for (const [id, serializedItem, score] of results) { | ||
const parsedItem = JSON.parse(serializedItem) as any; | ||
|
@@ -186,6 +190,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | |
visibilityTimeoutMs, | ||
attempt: parsedItem.attempt ?? 0, | ||
timestamp, | ||
deduplicationKey: parsedItem.deduplicationKey, | ||
}); | ||
} | ||
|
||
|
@@ -200,14 +205,22 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | |
} | ||
} | ||
|
||
async ack(id: string): Promise<void> { | ||
async ack(id: string, deduplicationKey?: string): Promise<void> { | ||
try { | ||
await this.redis.ackItem(`queue`, `items`, id); | ||
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); | ||
if (result === 0) { | ||
this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, { | ||
queue: this.name, | ||
id, | ||
deduplicationKey, | ||
}); | ||
} | ||
} catch (e) { | ||
this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, { | ||
queue: this.name, | ||
error: e, | ||
id, | ||
deduplicationKey, | ||
}); | ||
throw e; | ||
} | ||
|
@@ -367,15 +380,32 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> { | |
this.redis.defineCommand("ackItem", { | ||
numberOfKeys: 2, | ||
lua: ` | ||
local queue = KEYS[1] | ||
local items = KEYS[2] | ||
local queueKey = KEYS[1] | ||
local itemsKey = KEYS[2] | ||
local id = ARGV[1] | ||
local deduplicationKey = ARGV[2] | ||
|
||
redis.call('ZREM', queue, id) | ||
redis.call('HDEL', items, id) | ||
-- Get the item from the hash | ||
local item = redis.call('HGET', itemsKey, id) | ||
if not item then | ||
return -1 | ||
end | ||
|
||
-- Only check deduplicationKey if a non-empty one was passed in | ||
if deduplicationKey and deduplicationKey ~= "" then | ||
local success, parsed = pcall(cjson.decode, item) | ||
if success then | ||
if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then | ||
return 0 | ||
end | ||
end | ||
end | ||
|
||
-- Remove from sorted set and hash | ||
redis.call('ZREM', queueKey, id) | ||
redis.call('HDEL', itemsKey, id) | ||
return 1 | ||
`, | ||
`, | ||
Comment on lines
384
to
+412
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Lua script: avoid false‑positives when no deduplicationKey is stored If the stored item predates this feature it will have no Guard by returning - if deduplicationKey and deduplicationKey ~= "" then
+ if deduplicationKey == nil or deduplicationKey == "" then
+ local parsed = cjson.decode(item)
+ if parsed.deduplicationKey then
+ return 0
+ end
+ else
-- existing path This keeps backward‑compatibility (old items without a key are still ACKed) while protecting new ones.
|
||
}); | ||
|
||
this.redis.defineCommand("moveToDeadLetterQueue", { | ||
|
@@ -468,6 +498,7 @@ declare module "@internal/redis" { | |
queue: string, | ||
items: string, | ||
id: string, | ||
deduplicationKey: string, | ||
callback?: Callback<number> | ||
): Result<number, Context>; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Escalate ACK mismatches instead of silently logging them
ack()
only logs when the Lua script returns0
(deduplication‑key mismatch) or-1
(item missing) but still resolvesvoid
.Call‑sites (e.g.
Worker.processItem
) will therefore treat the job as successfully acknowledged even though it remains in Redis, leading to:Consider propagating the return value (or throwing) so that the caller can retry, reschedule or surface an error.
📝 Committable suggestion