Skip to content

Commit d5fdf7f

Browse files
committed
Redis pubsub to redrive from the worker
1 parent decf826 commit d5fdf7f

File tree

2 files changed

+121
-2
lines changed

2 files changed

+121
-2
lines changed

internal-packages/redis-worker/src/worker.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { describe } from "node:test";
44
import { expect } from "vitest";
55
import { z } from "zod";
66
import { Worker } from "./worker.js";
7+
import Redis from "ioredis";
78

89
describe("Worker", () => {
910
redisTest("Process items that don't throw", { timeout: 30_000 }, async ({ redisContainer }) => {
@@ -191,6 +192,89 @@ describe("Worker", () => {
191192
}
192193
);
193194

195+
redisTest(
196+
"Redrive an item from DLQ and process it successfully",
197+
{ timeout: 30_000 },
198+
async ({ redisContainer }) => {
199+
const processedItems: number[] = [];
200+
const failedItemId = "fail-then-redrive-item";
201+
let attemptCount = 0;
202+
203+
const worker = new Worker({
204+
name: "test-worker",
205+
redisOptions: {
206+
host: redisContainer.getHost(),
207+
port: redisContainer.getPort(),
208+
password: redisContainer.getPassword(),
209+
},
210+
catalog: {
211+
testJob: {
212+
schema: z.object({ value: z.number() }),
213+
visibilityTimeoutMs: 1000,
214+
retry: { maxAttempts: 3, minTimeoutInMs: 10, maxTimeoutInMs: 50 },
215+
},
216+
},
217+
jobs: {
218+
testJob: async ({ id, payload }) => {
219+
if (id === failedItemId && attemptCount < 3) {
220+
attemptCount++;
221+
throw new Error("Temporary failure");
222+
}
223+
processedItems.push(payload.value);
224+
},
225+
},
226+
concurrency: {
227+
workers: 1,
228+
tasksPerWorker: 1,
229+
},
230+
pollIntervalMs: 50,
231+
logger: new Logger("test", "error"),
232+
});
233+
234+
try {
235+
// Enqueue the item that will fail 3 times
236+
await worker.enqueue({
237+
id: failedItemId,
238+
job: "testJob",
239+
payload: { value: 999 },
240+
});
241+
242+
worker.start();
243+
244+
// Wait for the item to be processed and moved to DLQ
245+
await new Promise((resolve) => setTimeout(resolve, 1000));
246+
247+
// Check that the item is in the DLQ
248+
let dlqSize = await worker.queue.sizeOfDeadLetterQueue();
249+
expect(dlqSize).toBe(1);
250+
251+
// Create a Redis client to publish the redrive message
252+
const redisClient = new Redis({
253+
host: redisContainer.getHost(),
254+
port: redisContainer.getPort(),
255+
password: redisContainer.getPassword(),
256+
});
257+
258+
// Publish redrive message
259+
await redisClient.publish("test-worker:redrive", JSON.stringify({ id: failedItemId }));
260+
261+
// Wait for the item to be redrived and processed
262+
await new Promise((resolve) => setTimeout(resolve, 1000));
263+
264+
// Check that the item was processed successfully
265+
expect(processedItems).toEqual([999]);
266+
267+
// Check that the DLQ is now empty
268+
dlqSize = await worker.queue.sizeOfDeadLetterQueue();
269+
expect(dlqSize).toBe(0);
270+
271+
await redisClient.quit();
272+
} finally {
273+
worker.stop();
274+
}
275+
}
276+
);
277+
194278
//todo test that throwing an error doesn't screw up the other items
195279
//todo process more items when finished
196280

internal-packages/redis-worker/src/worker.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { Worker as NodeWorker } from "worker_threads";
77
import { z } from "zod";
88
import { SimpleQueue } from "./queue.js";
99

10+
import Redis from "ioredis";
11+
1012
type WorkerCatalog = {
1113
[key: string]: {
1214
schema: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
@@ -42,6 +44,8 @@ type WorkerOptions<TCatalog extends WorkerCatalog> = {
4244
};
4345

4446
class Worker<TCatalog extends WorkerCatalog> {
47+
private subscriber: Redis;
48+
4549
queue: SimpleQueue<QueueCatalogFromWorkerCatalog<TCatalog>>;
4650
private jobs: WorkerOptions<TCatalog>["jobs"];
4751
private logger: Logger;
@@ -55,7 +59,7 @@ class Worker<TCatalog extends WorkerCatalog> {
5559
const schema: QueueCatalogFromWorkerCatalog<TCatalog> = Object.fromEntries(
5660
Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema])
5761
) as QueueCatalogFromWorkerCatalog<TCatalog>;
58-
62+
//
5963
this.queue = new SimpleQueue({
6064
name: options.name,
6165
redisOptions: options.redisOptions,
@@ -74,6 +78,9 @@ class Worker<TCatalog extends WorkerCatalog> {
7478
}
7579

7680
this.setupShutdownHandlers();
81+
82+
this.subscriber = new Redis(options.redisOptions);
83+
this.setupSubscriber();
7784
}
7885

7986
enqueue<K extends keyof TCatalog>({
@@ -240,6 +247,32 @@ class Worker<TCatalog extends WorkerCatalog> {
240247
this.processItems(worker, count);
241248
}
242249

250+
private setupSubscriber() {
251+
const channel = `${this.options.name}:redrive`;
252+
this.subscriber.subscribe(channel, (err) => {
253+
if (err) {
254+
this.logger.error(`Failed to subscribe to ${channel}`, { error: err });
255+
} else {
256+
this.logger.log(`Subscribed to ${channel}`);
257+
}
258+
});
259+
260+
this.subscriber.on("message", this.handleRedriveMessage.bind(this));
261+
}
262+
263+
private async handleRedriveMessage(channel: string, message: string) {
264+
try {
265+
const { id } = JSON.parse(message);
266+
if (typeof id !== "string") {
267+
throw new Error("Invalid message format: id must be a string");
268+
}
269+
await this.queue.redriveFromDeadLetterQueue(id);
270+
this.logger.log(`Redrived item ${id} from Dead Letter Queue`);
271+
} catch (error) {
272+
this.logger.error("Error processing redrive message", { error, message });
273+
}
274+
}
275+
243276
private setupShutdownHandlers() {
244277
process.on("SIGTERM", this.shutdown.bind(this));
245278
process.on("SIGINT", this.shutdown.bind(this));
@@ -254,8 +287,10 @@ class Worker<TCatalog extends WorkerCatalog> {
254287
worker.terminate();
255288
}
256289

290+
await this.subscriber.unsubscribe();
291+
await this.subscriber.quit();
257292
await this.queue.close();
258-
this.logger.log("All workers shut down.");
293+
this.logger.log("All workers and subscribers shut down.");
259294
}
260295

261296
public start() {

0 commit comments

Comments
 (0)