Skip to content

Fix redis worker debounce #1966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 43 additions & 8 deletions packages/redis-worker/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export type QueueItem<TMessageCatalog extends MessageCatalogSchema> = {
visibilityTimeoutMs: number;
attempt: number;
timestamp: Date;
deduplicationKey?: string;
};

export type AnyQueueItem = {
Expand All @@ -36,6 +37,7 @@ export type AnyQueueItem = {
visibilityTimeoutMs: number;
attempt: number;
timestamp: Date;
deduplicationKey?: string;
};

export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
Expand Down Expand Up @@ -98,11 +100,13 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
}): Promise<void> {
try {
const score = availableAt ? availableAt.getTime() : Date.now();
const deduplicationKey = nanoid();
const serializedItem = JSON.stringify({
job,
item,
visibilityTimeoutMs,
attempt,
deduplicationKey,
});

const result = await this.redis.enqueueItem(
Expand Down Expand Up @@ -136,7 +140,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
return [];
}

const dequeuedItems = [];
const dequeuedItems: Array<QueueItem<TMessageCatalog>> = [];

for (const [id, serializedItem, score] of results) {
const parsedItem = JSON.parse(serializedItem) as any;
Expand Down Expand Up @@ -186,6 +190,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
visibilityTimeoutMs,
attempt: parsedItem.attempt ?? 0,
timestamp,
deduplicationKey: parsedItem.deduplicationKey,
});
}

Expand All @@ -200,14 +205,26 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
}
}

async ack(id: string): Promise<void> {
async ack(id: string, deduplicationKey?: string): Promise<void> {
try {
await this.redis.ackItem(`queue`, `items`, id);
const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? "");
if (result !== 1) {
this.logger.debug(
`SimpleQueue ${this.name}.ack(): ack operation returned ${result}. This means it was not removed from the queue.`,
{
queue: this.name,
id,
deduplicationKey,
result,
}
);
}
} catch (e) {
this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, {
queue: this.name,
error: e,
id,
deduplicationKey,
});
throw e;
}
Expand Down Expand Up @@ -367,15 +384,32 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
this.redis.defineCommand("ackItem", {
numberOfKeys: 2,
lua: `
local queue = KEYS[1]
local items = KEYS[2]
local queueKey = KEYS[1]
local itemsKey = KEYS[2]
local id = ARGV[1]
local deduplicationKey = ARGV[2]

redis.call('ZREM', queue, id)
redis.call('HDEL', items, id)
-- Get the item from the hash
local item = redis.call('HGET', itemsKey, id)
if not item then
return -1
end

-- Only check deduplicationKey if a non-empty one was passed in
if deduplicationKey and deduplicationKey ~= "" then
local success, parsed = pcall(cjson.decode, item)
if success then
if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then
return 0
end
end
end

-- Remove from sorted set and hash
redis.call('ZREM', queueKey, id)
redis.call('HDEL', itemsKey, id)
return 1
`,
`,
Comment on lines 384 to +412
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Lua script: avoid false‑positives when no deduplicationKey is stored

If the stored item predates this feature it will have no deduplicationKey.
When an old item is replaced by a new one with a key while another worker still holds the old key (""), the current logic bypasses the check (deduplicationKey argument is empty) and deletes the new item – re‑introducing the race.

Guard by returning 0 whenever the stored item has a key but none is supplied:

- if deduplicationKey and deduplicationKey ~= "" then
+ if deduplicationKey == nil or deduplicationKey == "" then
+   local parsed = cjson.decode(item)
+   if parsed.deduplicationKey then
+     return 0
+   end
+ else
   -- existing path

This keeps backward‑compatibility (old items without a key are still ACKed) while protecting new ones.

Committable suggestion skipped: line range outside the PR's diff.

});

this.redis.defineCommand("moveToDeadLetterQueue", {
Expand Down Expand Up @@ -468,6 +502,7 @@ declare module "@internal/redis" {
queue: string,
items: string,
id: string,
deduplicationKey: string,
callback?: Callback<number>
): Result<number, Context>;

Expand Down
Loading