Skip to content

Commit 3c753ec

Browse files
committed
Added some Redis worker debounce tests (one failing that reproduces a bug)
1 parent 2c19693 commit 3c753ec

File tree

1 file changed

+188
-0
lines changed

1 file changed

+188
-0
lines changed

packages/redis-worker/src/worker.test.ts

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,4 +250,192 @@ describe("Worker", () => {
250250
await redisClient.quit();
251251
}
252252
);
253+
254+
redisTest(
255+
"Should process a job with the same ID only once when rescheduled",
256+
{ timeout: 30_000 },
257+
async ({ redisContainer }) => {
258+
const processedPayloads: string[] = [];
259+
260+
const worker = new Worker({
261+
name: "test-worker",
262+
redisOptions: {
263+
host: redisContainer.getHost(),
264+
port: redisContainer.getPort(),
265+
password: redisContainer.getPassword(),
266+
},
267+
catalog: {
268+
testJob: {
269+
schema: z.object({ value: z.string() }),
270+
visibilityTimeoutMs: 5000,
271+
retry: { maxAttempts: 3 },
272+
},
273+
},
274+
jobs: {
275+
testJob: async ({ payload }) => {
276+
await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work
277+
processedPayloads.push(payload.value);
278+
},
279+
},
280+
concurrency: {
281+
workers: 1,
282+
tasksPerWorker: 1,
283+
},
284+
pollIntervalMs: 10, // Ensure quick polling to detect the scheduled item
285+
logger: new Logger("test", "log"),
286+
}).start();
287+
288+
// Unique ID to use for both enqueues
289+
const testJobId = "duplicate-job-id";
290+
291+
// Enqueue the first item immediately
292+
await worker.enqueue({
293+
id: testJobId,
294+
job: "testJob",
295+
payload: { value: "first-attempt" },
296+
availableAt: new Date(Date.now() + 50),
297+
});
298+
299+
// Enqueue another item with the same ID but scheduled 50ms in the future
300+
await worker.enqueue({
301+
id: testJobId,
302+
job: "testJob",
303+
payload: { value: "second-attempt" },
304+
availableAt: new Date(Date.now() + 50),
305+
});
306+
307+
// Wait enough time for both jobs to be processed if they were going to be
308+
await new Promise((resolve) => setTimeout(resolve, 300));
309+
310+
// Verify that only one job was processed (the second one should have replaced the first)
311+
expect(processedPayloads.length).toBe(1);
312+
313+
// Verify that the second job's payload was the one processed
314+
expect(processedPayloads[0]).toBe("second-attempt");
315+
316+
await worker.stop();
317+
}
318+
);
319+
320+
redisTest(
321+
"Should process second job with same ID when enqueued during first job execution with future availableAt",
322+
{ timeout: 30_000 },
323+
async ({ redisContainer }) => {
324+
const processedPayloads: string[] = [];
325+
const jobStarted: string[] = [];
326+
let firstJobCompleted = false;
327+
const events: string[] = [];
328+
329+
const worker = new Worker({
330+
name: "test-worker",
331+
redisOptions: {
332+
host: redisContainer.getHost(),
333+
port: redisContainer.getPort(),
334+
password: redisContainer.getPassword(),
335+
},
336+
catalog: {
337+
testJob: {
338+
schema: z.object({ value: z.string() }),
339+
visibilityTimeoutMs: 5000,
340+
retry: { maxAttempts: 3 },
341+
},
342+
},
343+
jobs: {
344+
testJob: async ({ payload }) => {
345+
// Record when the job starts processing
346+
jobStarted.push(payload.value);
347+
events.push(`Job started: ${payload.value}`);
348+
349+
if (payload.value === "first-attempt") {
350+
// First job takes a long time to process
351+
await new Promise((resolve) => setTimeout(resolve, 1_000));
352+
firstJobCompleted = true;
353+
}
354+
355+
// Record when the job completes
356+
processedPayloads.push(payload.value);
357+
events.push(`Job completed: ${payload.value}`);
358+
},
359+
},
360+
concurrency: {
361+
workers: 1,
362+
tasksPerWorker: 1,
363+
},
364+
pollIntervalMs: 10,
365+
logger: new Logger("test", "log"),
366+
}).start();
367+
368+
const testJobId = "long-running-job-id";
369+
370+
// Queue the first job
371+
await worker.enqueue({
372+
id: testJobId,
373+
job: "testJob",
374+
payload: { value: "first-attempt" },
375+
});
376+
events.push("First job enqueued");
377+
378+
// Verify initial queue size
379+
const size1 = await worker.queue.size({ includeFuture: true });
380+
events.push(`Queue size after first enqueue: ${size1}`);
381+
expect(size1).toBe(1);
382+
383+
// Wait until we know the first job has started processing
384+
while (jobStarted.length === 0) {
385+
await new Promise((resolve) => setTimeout(resolve, 10));
386+
}
387+
388+
// Now that first job is running, queue second job with same ID
389+
// Set availableAt to be 1.5 seconds in the future (after first job completes)
390+
await worker.enqueue({
391+
id: testJobId,
392+
job: "testJob",
393+
payload: { value: "second-attempt" },
394+
availableAt: new Date(Date.now() + 1500),
395+
});
396+
events.push("Second job enqueued with future availableAt");
397+
398+
// Verify queue size after second enqueue
399+
const size2 = await worker.queue.size({ includeFuture: true });
400+
const size2Present = await worker.queue.size({ includeFuture: false });
401+
events.push(`Queue size after second enqueue (including future): ${size2}`);
402+
events.push(`Queue size after second enqueue (present only): ${size2Present}`);
403+
expect(size2).toBe(1); // Should still be 1 as it's the same ID
404+
405+
// Wait for the first job to complete
406+
while (!firstJobCompleted) {
407+
await new Promise((resolve) => setTimeout(resolve, 10));
408+
}
409+
events.push("First job completed");
410+
411+
// Check queue size right after first job completes
412+
const size3 = await worker.queue.size({ includeFuture: true });
413+
const size3Present = await worker.queue.size({ includeFuture: false });
414+
events.push(`Queue size after first job completes (including future): ${size3}`);
415+
events.push(`Queue size after first job completes (present only): ${size3Present}`);
416+
417+
// Wait long enough for the second job to become available and potentially run
418+
await new Promise((resolve) => setTimeout(resolve, 2000));
419+
420+
// Final queue size
421+
const size4 = await worker.queue.size({ includeFuture: true });
422+
const size4Present = await worker.queue.size({ includeFuture: false });
423+
events.push(`Final queue size (including future): ${size4}`);
424+
events.push(`Final queue size (present only): ${size4Present}`);
425+
426+
console.log("Event sequence:", events);
427+
428+
// First job should have run
429+
expect(processedPayloads).toContain("first-attempt");
430+
431+
// These assertions should fail - demonstrating the bug
432+
// The second job should run after its availableAt time, but doesn't because
433+
// the ack from the first job removed it from Redis entirely
434+
expect(jobStarted).toContain("second-attempt");
435+
expect(processedPayloads).toContain("second-attempt");
436+
expect(processedPayloads.length).toBe(2);
437+
438+
await worker.stop();
439+
}
440+
);
253441
});

0 commit comments

Comments
 (0)