@@ -97,6 +97,12 @@ class ProdWorker {
97
97
idempotencyKey : string ;
98
98
}
99
99
| undefined ;
100
+ private readyForResumeReplay :
101
+ | {
102
+ idempotencyKey : string ;
103
+ type : WaitReason ;
104
+ }
105
+ | undefined ;
100
106
101
107
#httpPort: number ;
102
108
#httpServer: ReturnType < typeof createServer > ;
@@ -405,13 +411,16 @@ class ProdWorker {
405
411
this . waitForPostStart = false ;
406
412
407
413
this . durationResumeFallback = undefined ;
414
+ this . readyForResumeReplay = undefined ;
408
415
409
416
this . _taskRunProcess ?. waitCompletedNotification ( ) ;
410
417
}
411
418
412
419
async #readyForLazyAttempt( ) {
413
420
const idempotencyKey = randomUUID ( ) ;
414
421
422
+ logger . log ( "ready for lazy attempt" , { idempotencyKey } ) ;
423
+
415
424
this . readyForLazyAttemptReplay = {
416
425
idempotencyKey,
417
426
} ;
@@ -420,7 +429,7 @@ class ProdWorker {
420
429
// ..but we also have to be fast to avoid failing the task due to missing heartbeat
421
430
for await ( const { delay, retry } of defaultBackoff . min ( 10 ) . maxRetries ( 7 ) ) {
422
431
if ( retry > 0 ) {
423
- logger . log ( "retrying ready for lazy attempt" , { retry } ) ;
432
+ logger . log ( "retrying ready for lazy attempt" , { retry, idempotencyKey } ) ;
424
433
}
425
434
426
435
this . #coordinatorSocket. socket . emit ( "READY_FOR_LAZY_ATTEMPT" , {
@@ -453,6 +462,93 @@ class ProdWorker {
453
462
this . #failRun( this . runId , "Failed to receive execute request in a reasonable time" ) ;
454
463
}
455
464
465
+ async #readyForResume( ) {
466
+ const idempotencyKey = randomUUID ( ) ;
467
+
468
+ logger . log ( "readyForResume()" , {
469
+ nextResumeAfter : this . nextResumeAfter ,
470
+ attemptFriendlyId : this . attemptFriendlyId ,
471
+ attemptNumber : this . attemptNumber ,
472
+ idempotencyKey,
473
+ } ) ;
474
+
475
+ if ( ! this . nextResumeAfter ) {
476
+ logger . error ( "Missing next resume reason" , { status : this . #status } ) ;
477
+
478
+ this . #emitUnrecoverableError(
479
+ "NoNextResume" ,
480
+ "Next resume reason not set while resuming from paused state"
481
+ ) ;
482
+
483
+ return ;
484
+ }
485
+
486
+ if ( ! this . attemptFriendlyId ) {
487
+ logger . error ( "Missing attempt friendly ID" , { status : this . #status } ) ;
488
+
489
+ this . #emitUnrecoverableError(
490
+ "NoAttemptId" ,
491
+ "Attempt ID not set while resuming from paused state"
492
+ ) ;
493
+
494
+ return ;
495
+ }
496
+
497
+ if ( ! this . attemptNumber ) {
498
+ logger . error ( "Missing attempt number" , { status : this . #status } ) ;
499
+
500
+ this . #emitUnrecoverableError(
501
+ "NoAttemptNumber" ,
502
+ "Attempt number not set while resuming from paused state"
503
+ ) ;
504
+
505
+ return ;
506
+ }
507
+
508
+ this . readyForResumeReplay = {
509
+ idempotencyKey,
510
+ type : this . nextResumeAfter ,
511
+ } ;
512
+
513
+ const lockedMetadata = {
514
+ attemptFriendlyId : this . attemptFriendlyId ,
515
+ attemptNumber : this . attemptNumber ,
516
+ type : this . nextResumeAfter ,
517
+ } ;
518
+
519
+ // Retry if we don't receive RESUME_AFTER_DEPENDENCY or RESUME_AFTER_DURATION in a reasonable time
520
+ // ..but we also have to be fast to avoid failing the task due to missing heartbeat
521
+ for await ( const { delay, retry } of defaultBackoff . min ( 10 ) . maxRetries ( 7 ) ) {
522
+ if ( retry > 0 ) {
523
+ logger . log ( "retrying ready for resume" , { retry, idempotencyKey } ) ;
524
+ }
525
+
526
+ this . #coordinatorSocket. socket . emit ( "READY_FOR_RESUME" , {
527
+ version : "v2" ,
528
+ ...lockedMetadata ,
529
+ } ) ;
530
+
531
+ await timeout ( delay . milliseconds ) ;
532
+
533
+ if ( ! this . readyForResumeReplay ) {
534
+ logger . log ( "replay ready for resume cancelled, discarding" , {
535
+ idempotencyKey,
536
+ } ) ;
537
+
538
+ return ;
539
+ }
540
+
541
+ if ( idempotencyKey !== this . readyForResumeReplay . idempotencyKey ) {
542
+ logger . log ( "replay ready for resume idempotency key mismatch, discarding" , {
543
+ idempotencyKey,
544
+ newIdempotencyKey : this . readyForResumeReplay . idempotencyKey ,
545
+ } ) ;
546
+
547
+ return ;
548
+ }
549
+ }
550
+ }
551
+
456
552
#readyForCheckpoint( ) {
457
553
this . #coordinatorSocket. socket . emit ( "READY_FOR_CHECKPOINT" , { version : "v1" } ) ;
458
554
}
@@ -630,6 +726,7 @@ class ProdWorker {
630
726
this . paused = false ;
631
727
this . nextResumeAfter = undefined ;
632
728
this . waitForPostStart = false ;
729
+ this . readyForResumeReplay = undefined ;
633
730
634
731
for ( let i = 0 ; i < completions . length ; i ++ ) {
635
732
const completion = completions [ i ] ;
@@ -845,46 +942,7 @@ class ProdWorker {
845
942
}
846
943
847
944
if ( this . paused ) {
848
- if ( ! this . nextResumeAfter ) {
849
- logger . error ( "Missing next resume reason" , { status : this . #status } ) ;
850
-
851
- this . #emitUnrecoverableError(
852
- "NoNextResume" ,
853
- "Next resume reason not set while resuming from paused state"
854
- ) ;
855
-
856
- return ;
857
- }
858
-
859
- if ( ! this . attemptFriendlyId ) {
860
- logger . error ( "Missing attempt friendly ID" , { status : this . #status } ) ;
861
-
862
- this . #emitUnrecoverableError(
863
- "NoAttemptId" ,
864
- "Attempt ID not set while resuming from paused state"
865
- ) ;
866
-
867
- return ;
868
- }
869
-
870
- if ( ! this . attemptNumber ) {
871
- logger . error ( "Missing attempt number" , { status : this . #status } ) ;
872
-
873
- this . #emitUnrecoverableError(
874
- "NoAttemptNumber" ,
875
- "Attempt number not set while resuming from paused state"
876
- ) ;
877
-
878
- return ;
879
- }
880
-
881
- socket . emit ( "READY_FOR_RESUME" , {
882
- version : "v2" ,
883
- attemptFriendlyId : this . attemptFriendlyId ,
884
- attemptNumber : this . attemptNumber ,
885
- type : this . nextResumeAfter ,
886
- } ) ;
887
-
945
+ await this . #readyForResume( ) ;
888
946
return ;
889
947
}
890
948
@@ -1293,6 +1351,9 @@ class ProdWorker {
1293
1351
attemptNumber : this . attemptNumber ,
1294
1352
waitForTaskReplay : this . waitForTaskReplay ,
1295
1353
waitForBatchReplay : this . waitForBatchReplay ,
1354
+ readyForLazyAttemptReplay : this . readyForLazyAttemptReplay ,
1355
+ durationResumeFallback : this . durationResumeFallback ,
1356
+ readyForResumeReplay : this . readyForResumeReplay ,
1296
1357
} ;
1297
1358
}
1298
1359
0 commit comments