@@ -438,4 +438,129 @@ describe("Worker", () => {
438
438
await worker . stop ( ) ;
439
439
}
440
440
) ;
441
+
442
+ redisTest (
443
+ "Should properly remove future-scheduled job after completion" ,
444
+ { timeout : 30_000 } ,
445
+ async ( { redisContainer } ) => {
446
+ const processedPayloads : string [ ] = [ ] ;
447
+
448
+ const worker = new Worker ( {
449
+ name : "test-worker" ,
450
+ redisOptions : {
451
+ host : redisContainer . getHost ( ) ,
452
+ port : redisContainer . getPort ( ) ,
453
+ password : redisContainer . getPassword ( ) ,
454
+ } ,
455
+ catalog : {
456
+ testJob : {
457
+ schema : z . object ( { value : z . string ( ) } ) ,
458
+ visibilityTimeoutMs : 5000 ,
459
+ retry : { maxAttempts : 3 } ,
460
+ } ,
461
+ } ,
462
+ jobs : {
463
+ testJob : async ( { payload } ) => {
464
+ processedPayloads . push ( payload . value ) ;
465
+ } ,
466
+ } ,
467
+ concurrency : {
468
+ workers : 1 ,
469
+ tasksPerWorker : 1 ,
470
+ } ,
471
+ pollIntervalMs : 10 ,
472
+ logger : new Logger ( "test" , "debug" ) , // Use debug to see all logs
473
+ } ) . start ( ) ;
474
+
475
+ // Schedule a job 500ms in the future
476
+ await worker . enqueue ( {
477
+ id : "future-job" ,
478
+ job : "testJob" ,
479
+ payload : { value : "test" } ,
480
+ availableAt : new Date ( Date . now ( ) + 500 ) ,
481
+ } ) ;
482
+
483
+ // Verify it's in the future queue
484
+ const initialSize = await worker . queue . size ( ) ;
485
+ const initialSizeWithFuture = await worker . queue . size ( { includeFuture : true } ) ;
486
+ expect ( initialSize ) . toBe ( 0 ) ;
487
+ expect ( initialSizeWithFuture ) . toBe ( 1 ) ;
488
+
489
+ // Wait for job to be processed
490
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
491
+
492
+ // Verify job was processed
493
+ expect ( processedPayloads ) . toContain ( "test" ) ;
494
+
495
+ // Verify queue is completely empty
496
+ const finalSize = await worker . queue . size ( ) ;
497
+ const finalSizeWithFuture = await worker . queue . size ( { includeFuture : true } ) ;
498
+ expect ( finalSize ) . toBe ( 0 ) ;
499
+ expect ( finalSizeWithFuture ) . toBe ( 0 ) ;
500
+
501
+ await worker . stop ( ) ;
502
+ }
503
+ ) ;
504
+
505
+ redisTest (
506
+ "Should properly remove immediate job after completion" ,
507
+ { timeout : 30_000 } ,
508
+ async ( { redisContainer } ) => {
509
+ const processedPayloads : string [ ] = [ ] ;
510
+
511
+ const worker = new Worker ( {
512
+ name : "test-worker" ,
513
+ redisOptions : {
514
+ host : redisContainer . getHost ( ) ,
515
+ port : redisContainer . getPort ( ) ,
516
+ password : redisContainer . getPassword ( ) ,
517
+ } ,
518
+ catalog : {
519
+ testJob : {
520
+ schema : z . object ( { value : z . string ( ) } ) ,
521
+ visibilityTimeoutMs : 5000 ,
522
+ retry : { maxAttempts : 3 } ,
523
+ } ,
524
+ } ,
525
+ jobs : {
526
+ testJob : async ( { payload } ) => {
527
+ processedPayloads . push ( payload . value ) ;
528
+ } ,
529
+ } ,
530
+ concurrency : {
531
+ workers : 1 ,
532
+ tasksPerWorker : 1 ,
533
+ } ,
534
+ pollIntervalMs : 10 ,
535
+ logger : new Logger ( "test" , "debug" ) , // Use debug to see all logs
536
+ } ) . start ( ) ;
537
+
538
+ // Enqueue a job to run immediately
539
+ await worker . enqueue ( {
540
+ id : "immediate-job" ,
541
+ job : "testJob" ,
542
+ payload : { value : "test" } ,
543
+ } ) ;
544
+
545
+ // Verify it's in the present queue
546
+ const initialSize = await worker . queue . size ( ) ;
547
+ const initialSizeWithFuture = await worker . queue . size ( { includeFuture : true } ) ;
548
+ expect ( initialSize ) . toBe ( 1 ) ;
549
+ expect ( initialSizeWithFuture ) . toBe ( 1 ) ;
550
+
551
+ // Wait for job to be processed
552
+ await new Promise ( ( resolve ) => setTimeout ( resolve , 1000 ) ) ;
553
+
554
+ // Verify job was processed
555
+ expect ( processedPayloads ) . toContain ( "test" ) ;
556
+
557
+ // Verify queue is completely empty
558
+ const finalSize = await worker . queue . size ( ) ;
559
+ const finalSizeWithFuture = await worker . queue . size ( { includeFuture : true } ) ;
560
+ expect ( finalSize ) . toBe ( 0 ) ;
561
+ expect ( finalSizeWithFuture ) . toBe ( 0 ) ;
562
+
563
+ await worker . stop ( ) ;
564
+ }
565
+ ) ;
441
566
} ) ;
0 commit comments