@@ -419,22 +419,28 @@ export class BackgroundWorker {
419
419
}
420
420
}
421
421
422
- async #initializeTaskRunProcess(
422
+ #prefixedMessage( payload : TaskRunExecutionPayload , message : string = "" ) {
423
+ return `[${ payload . execution . run . id } .${ payload . execution . attempt . number } ] ${ message } ` ;
424
+ }
425
+
426
+ async #getFreshTaskRunProcess(
423
427
payload : TaskRunExecutionPayload ,
424
428
messageId ?: string
425
429
) : Promise < TaskRunProcess > {
430
+ logger . debug ( this . #prefixedMessage( payload , "getFreshTaskRunProcess()" ) ) ;
431
+
426
432
if ( ! this . metadata ) {
427
433
throw new Error ( "Worker not registered" ) ;
428
434
}
429
435
430
436
this . _closed = false ;
431
437
432
- if ( this . _taskRunProcesses . has ( payload . execution . run . id ) ) {
433
- return this . _taskRunProcesses . get ( payload . execution . run . id ) as TaskRunProcess ;
434
- }
438
+ logger . debug ( this . #prefixedMessage( payload , "killing current task run process before attempt" ) ) ;
435
439
436
440
await this . #killCurrentTaskRunProcessBeforeAttempt( payload . execution . run . id ) ;
437
441
442
+ logger . debug ( this . #prefixedMessage( payload , "creating new task run process" ) ) ;
443
+
438
444
const taskRunProcess = new TaskRunProcess (
439
445
payload . execution . run . id ,
440
446
payload . execution . run . isTest ,
@@ -450,7 +456,15 @@ export class BackgroundWorker {
450
456
) ;
451
457
452
458
taskRunProcess . onExit . attach ( ( { pid } ) => {
453
- this . _taskRunProcesses . delete ( payload . execution . run . id ) ;
459
+ logger . debug ( this . #prefixedMessage( payload , "onExit()" ) , { pid } ) ;
460
+
461
+ const taskRunProcess = this . _taskRunProcesses . get ( payload . execution . run . id ) ;
462
+
463
+ // Only delete the task run process if the pid matches
464
+ if ( taskRunProcess ?. pid === pid ) {
465
+ this . _taskRunProcesses . delete ( payload . execution . run . id ) ;
466
+ }
467
+
454
468
if ( pid ) {
455
469
this . _taskRunProcessesBeingKilled . delete ( pid ) ;
456
470
}
@@ -481,54 +495,62 @@ export class BackgroundWorker {
481
495
const taskRunProcess = this . _taskRunProcesses . get ( runId ) ;
482
496
483
497
if ( ! taskRunProcess ) {
498
+ logger . debug ( `[${ runId } ] no current task process to kill` ) ;
484
499
return ;
485
500
}
486
501
502
+ logger . debug ( `[${ runId } ] killing current task process` , {
503
+ pid : taskRunProcess . pid ,
504
+ } ) ;
505
+
487
506
if ( taskRunProcess . isBeingKilled ) {
488
507
if ( this . _taskRunProcessesBeingKilled . size > 1 ) {
489
- // If there's more than one being killed, wait for graceful exit
490
- try {
491
- await taskRunProcess . onExit . waitFor ( 5_000 ) ;
492
- } catch ( error ) {
493
- console . error ( "TaskRunProcess graceful kill timeout exceeded" , error ) ;
494
-
495
- try {
496
- const forcedKill = taskRunProcess . onExit . waitFor ( 5_000 ) ;
497
- taskRunProcess . kill ( "SIGKILL" ) ;
498
- await forcedKill ;
499
- } catch ( error ) {
500
- console . error ( "TaskRunProcess forced kill timeout exceeded" , error ) ;
501
- throw new SigKillTimeoutProcessError ( ) ;
502
- }
503
- }
508
+ await this . #tryGracefulExit( taskRunProcess ) ;
504
509
} else {
505
510
// If there's only one or none being killed, don't do anything so we can create a fresh one in parallel
506
511
}
507
512
} else {
508
513
// It's not being killed, so kill it
509
514
if ( this . _taskRunProcessesBeingKilled . size > 0 ) {
510
- // If there's one being killed already, wait for graceful exit
511
- try {
512
- await taskRunProcess . onExit . waitFor ( 5_000 ) ;
513
- } catch ( error ) {
514
- console . error ( "TaskRunProcess graceful kill timeout exceeded" , error ) ;
515
-
516
- try {
517
- const forcedKill = taskRunProcess . onExit . waitFor ( 5_000 ) ;
518
- taskRunProcess . kill ( "SIGKILL" ) ;
519
- await forcedKill ;
520
- } catch ( error ) {
521
- console . error ( "TaskRunProcess forced kill timeout exceeded" , error ) ;
522
- throw new SigKillTimeoutProcessError ( ) ;
523
- }
524
- }
515
+ await this . #tryGracefulExit( taskRunProcess ) ;
525
516
} else {
526
517
// There's none being killed yet, so we can kill it without waiting. We still set a timeout to kill it forcefully just in case it sticks around.
527
518
taskRunProcess . kill ( "SIGTERM" , 5_000 ) . catch ( ( ) => { } ) ;
528
519
}
529
520
}
530
521
}
531
522
523
+ async #tryGracefulExit(
524
+ taskRunProcess : TaskRunProcess ,
525
+ kill = false ,
526
+ initialSignal : number | NodeJS . Signals = "SIGTERM"
527
+ ) {
528
+ try {
529
+ const initialExit = taskRunProcess . onExit . waitFor ( 5_000 ) ;
530
+
531
+ if ( kill ) {
532
+ taskRunProcess . kill ( initialSignal ) ;
533
+ }
534
+
535
+ await initialExit ;
536
+ } catch ( error ) {
537
+ logger . error ( "TaskRunProcess graceful kill timeout exceeded" , error ) ;
538
+
539
+ this . #tryForcefulExit( taskRunProcess ) ;
540
+ }
541
+ }
542
+
543
+ async #tryForcefulExit( taskRunProcess : TaskRunProcess ) {
544
+ try {
545
+ const forcedKill = taskRunProcess . onExit . waitFor ( 5_000 ) ;
546
+ taskRunProcess . kill ( "SIGKILL" ) ;
547
+ await forcedKill ;
548
+ } catch ( error ) {
549
+ logger . error ( "TaskRunProcess forced kill timeout exceeded" , error ) ;
550
+ throw new SigKillTimeoutProcessError ( ) ;
551
+ }
552
+ }
553
+
532
554
async cancelRun ( taskRunId : string ) {
533
555
const taskRunProcess = this . _taskRunProcesses . get ( taskRunId ) ;
534
556
@@ -636,7 +658,12 @@ export class BackgroundWorker {
636
658
messageId ?: string
637
659
) : Promise < TaskRunExecutionResult > {
638
660
try {
639
- const taskRunProcess = await this . #initializeTaskRunProcess( payload , messageId ) ;
661
+ const taskRunProcess = await this . #getFreshTaskRunProcess( payload , messageId ) ;
662
+
663
+ logger . debug ( this . #prefixedMessage( payload , "executing task run" ) , {
664
+ pid : taskRunProcess . pid ,
665
+ } ) ;
666
+
640
667
const result = await taskRunProcess . executeTaskRun ( payload ) ;
641
668
642
669
// Always kill the worker
@@ -829,7 +856,7 @@ class TaskRunProcess {
829
856
this . onIsBeingKilled . post ( this . _child ?. pid ) ;
830
857
}
831
858
832
- logger . debug ( `[${ this . runId } ] cleaning up task run process` , { kill } ) ;
859
+ logger . debug ( `[${ this . runId } ] cleaning up task run process` , { kill, pid : this . pid } ) ;
833
860
834
861
await this . _sender . send ( "CLEANUP" , {
835
862
flush : true ,
@@ -841,7 +868,7 @@ class TaskRunProcess {
841
868
// Set a timeout to kill the child process if it hasn't been killed within 5 seconds
842
869
setTimeout ( ( ) => {
843
870
if ( this . _child && ! this . _child . killed ) {
844
- logger . debug ( `[${ this . runId } ] killing task run process after timeout` ) ;
871
+ logger . debug ( `[${ this . runId } ] killing task run process after timeout` , { pid : this . pid } ) ;
845
872
846
873
this . _child . kill ( ) ;
847
874
}
@@ -949,7 +976,7 @@ class TaskRunProcess {
949
976
}
950
977
951
978
async #handleExit( code : number | null , signal : NodeJS . Signals | null ) {
952
- logger . debug ( `[${ this . runId } ] task run process exiting ` , { code, signal } ) ;
979
+ logger . debug ( `[${ this . runId } ] handle task run process exit ` , { code, signal, pid : this . pid } ) ;
953
980
954
981
// Go through all the attempts currently pending and reject them
955
982
for ( const [ id , status ] of this . _attemptStatuses . entries ( ) ) {
@@ -1014,9 +1041,9 @@ class TaskRunProcess {
1014
1041
}
1015
1042
1016
1043
#kill( ) {
1017
- if ( this . _child && ! this . _child . killed ) {
1018
- logger . debug ( `[${ this . runId } ] killing task run process` ) ;
1044
+ logger . debug ( `[${ this . runId } ] #kill()` , { pid : this . pid } ) ;
1019
1045
1046
+ if ( this . _child && ! this . _child . killed ) {
1020
1047
this . _child ?. kill ( ) ;
1021
1048
}
1022
1049
}
0 commit comments