Skip to content

Commit 72cbabe

Browse files
committed
The worker passes the deduplicationKey back in for acking
1 parent 470cb2f commit 72cbabe

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

packages/redis-worker/src/worker.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type JobHandler<Catalog extends WorkerCatalog, K extends keyof Catalog> = (param
2828
payload: z.infer<Catalog[K]["schema"]>;
2929
visibilityTimeoutMs: number;
3030
attempt: number;
31+
deduplicationKey?: string;
3132
}) => Promise<void>;
3233

3334
export type WorkerConcurrencyOptions = {
@@ -345,7 +346,7 @@ class Worker<TCatalog extends WorkerCatalog> {
345346
* Processes a single item.
346347
*/
347348
private async processItem(
348-
{ id, job, item, visibilityTimeoutMs, attempt, timestamp }: AnyQueueItem,
349+
{ id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem,
349350
batchSize: number,
350351
workerId: string
351352
): Promise<void> {
@@ -362,7 +363,7 @@ class Worker<TCatalog extends WorkerCatalog> {
362363
async () => {
363364
await this.withHistogram(
364365
this.metrics.jobDuration,
365-
handler({ id, payload: item, visibilityTimeoutMs, attempt }),
366+
handler({ id, payload: item, visibilityTimeoutMs, attempt, deduplicationKey }),
366367
{
367368
worker_id: workerId,
368369
batch_size: batchSize,
@@ -372,7 +373,7 @@ class Worker<TCatalog extends WorkerCatalog> {
372373
);
373374

374375
// On success, acknowledge the item.
375-
await this.queue.ack(id);
376+
await this.queue.ack(id, deduplicationKey);
376377
},
377378
{
378379
kind: SpanKind.CONSUMER,

0 commit comments

Comments
 (0)