Skip to content

Commit 0c25754

Browse files
committed
DLQ functionality
1 parent eee5649 commit 0c25754

File tree

1 file changed

+141
-0
lines changed
  • internal-packages/redis-worker/src

1 file changed

+141
-0
lines changed

internal-packages/redis-worker/src/queue.ts

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,72 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
217217
}
218218
}
219219

220+
async moveToDeadLetterQueue(id: string, errorMessage: string): Promise<void> {
221+
try {
222+
const result = await this.redis.moveToDeadLetterQueue(
223+
`queue`,
224+
`items`,
225+
`dlq`,
226+
`dlq:items`,
227+
id,
228+
errorMessage
229+
);
230+
231+
if (result !== 1) {
232+
throw new Error("Move to Dead Letter Queue operation failed");
233+
}
234+
} catch (e) {
235+
this.logger.error(
236+
`SimpleQueue ${this.name}.moveToDeadLetterQueue(): error moving item to DLQ`,
237+
{
238+
queue: this.name,
239+
error: e,
240+
id,
241+
errorMessage,
242+
}
243+
);
244+
throw e;
245+
}
246+
}
247+
248+
async sizeOfDeadLetterQueue(): Promise<number> {
249+
try {
250+
return await this.redis.zcard(`dlq`);
251+
} catch (e) {
252+
this.logger.error(`SimpleQueue ${this.name}.dlqSize(): error getting DLQ size`, {
253+
queue: this.name,
254+
error: e,
255+
});
256+
throw e;
257+
}
258+
}
259+
260+
async redriveFromDeadLetterQueue(id: string): Promise<void> {
261+
try {
262+
const result = await this.redis.redriveFromDeadLetterQueue(
263+
`queue`,
264+
`items`,
265+
`dlq`,
266+
`dlq:items`,
267+
id
268+
);
269+
270+
if (result !== 1) {
271+
throw new Error("Redrive from Dead Letter Queue operation failed");
272+
}
273+
} catch (e) {
274+
this.logger.error(
275+
`SimpleQueue ${this.name}.redriveFromDeadLetterQueue(): error redriving item from DLQ`,
276+
{
277+
queue: this.name,
278+
error: e,
279+
id,
280+
}
281+
);
282+
throw e;
283+
}
284+
}
285+
220286
async close(): Promise<void> {
221287
await this.redis.quit();
222288
}
@@ -291,6 +357,61 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
291357
return 1
292358
`,
293359
});
360+
361+
this.redis.defineCommand("moveToDeadLetterQueue", {
362+
numberOfKeys: 4,
363+
lua: `
364+
local queue = KEYS[1]
365+
local items = KEYS[2]
366+
local dlq = KEYS[3]
367+
local dlqItems = KEYS[4]
368+
local id = ARGV[1]
369+
local errorMessage = ARGV[2]
370+
371+
local item = redis.call('HGET', items, id)
372+
if not item then
373+
return 0
374+
end
375+
376+
local parsedItem = cjson.decode(item)
377+
parsedItem.errorMessage = errorMessage
378+
379+
redis.call('ZREM', queue, id)
380+
redis.call('HDEL', items, id)
381+
382+
redis.call('ZADD', dlq, redis.call('TIME')[1], id)
383+
redis.call('HSET', dlqItems, id, cjson.encode(parsedItem))
384+
385+
return 1
386+
`,
387+
});
388+
389+
this.redis.defineCommand("redriveFromDeadLetterQueue", {
390+
numberOfKeys: 4,
391+
lua: `
392+
local queue = KEYS[1]
393+
local items = KEYS[2]
394+
local dlq = KEYS[3]
395+
local dlqItems = KEYS[4]
396+
local id = ARGV[1]
397+
398+
local item = redis.call('HGET', dlqItems, id)
399+
if not item then
400+
return 0
401+
end
402+
403+
local parsedItem = cjson.decode(item)
404+
parsedItem.errorMessage = nil
405+
406+
redis.call('ZREM', dlq, id)
407+
redis.call('HDEL', dlqItems, id)
408+
409+
redis.call('ZADD', queue, redis.call('TIME')[1], id)
410+
redis.call('HSET', items, id, cjson.encode(parsedItem))
411+
412+
return 1
413+
`,
414+
});
294415
}
295416
}
296417

@@ -306,6 +427,7 @@ declare module "ioredis" {
306427
serializedItem: string,
307428
callback?: Callback<number>
308429
): Result<number, Context>;
430+
309431
dequeueItems(
310432
//keys
311433
queue: string,
@@ -322,5 +444,24 @@ declare module "ioredis" {
322444
id: string,
323445
callback?: Callback<number>
324446
): Result<number, Context>;
447+
448+
redriveFromDeadLetterQueue(
449+
queue: string,
450+
items: string,
451+
dlq: string,
452+
dlqItems: string,
453+
id: string,
454+
callback?: Callback<number>
455+
): Result<number, Context>;
456+
457+
moveToDeadLetterQueue(
458+
queue: string,
459+
items: string,
460+
dlq: string,
461+
dlqItems: string,
462+
id: string,
463+
errorMessage: string,
464+
callback?: Callback<number>
465+
): Result<number, Context>;
325466
}
326467
}

0 commit comments

Comments
 (0)