@@ -216,7 +216,7 @@ export class RunExecution {
216
216
*
217
217
* This is the main entry point for snapshot changes, but processing is deferred to the snapshot manager.
218
218
*/
219
- public async enqueueSnapshotChangeAndWait ( runData : RunExecutionData ) : Promise < void > {
219
+ private async enqueueSnapshotChangesAndWait ( snapshots : RunExecutionData [ ] ) : Promise < void > {
220
220
if ( this . isShuttingDown ) {
221
221
this . sendDebugLog ( "enqueueSnapshotChangeAndWait: shutting down, skipping" ) ;
222
222
return ;
@@ -227,10 +227,13 @@ export class RunExecution {
227
227
return ;
228
228
}
229
229
230
- await this . snapshotManager . handleSnapshotChange ( runData ) ;
230
+ await this . snapshotManager . handleSnapshotChanges ( snapshots ) ;
231
231
}
232
232
233
- private async processSnapshotChange ( runData : RunExecutionData ) : Promise < void > {
233
+ private async processSnapshotChange (
234
+ runData : RunExecutionData ,
235
+ deprecated : boolean
236
+ ) : Promise < void > {
234
237
const { run, snapshot, completedWaitpoints } = runData ;
235
238
236
239
const snapshotMetadata = {
@@ -257,6 +260,13 @@ export class RunExecution {
257
260
this . snapshotPoller ?. updateSnapshotId ( snapshot . friendlyId ) ;
258
261
this . snapshotPoller ?. resetCurrentInterval ( ) ;
259
262
263
+ if ( deprecated ) {
264
+ this . sendDebugLog ( "run execution is deprecated" , { incomingSnapshot : snapshot } ) ;
265
+
266
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
267
+ return ;
268
+ }
269
+
260
270
switch ( snapshot . executionStatus ) {
261
271
case "PENDING_CANCEL" : {
262
272
this . sendDebugLog ( "run was cancelled" , snapshotMetadata ) ;
@@ -276,14 +286,12 @@ export class RunExecution {
276
286
case "QUEUED" : {
277
287
this . sendDebugLog ( "run was re-queued" , snapshotMetadata ) ;
278
288
279
- // Pretend we've just suspended the run. This will kill the process without failing the run.
280
289
await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
281
290
return ;
282
291
}
283
292
case "FINISHED" : {
284
293
this . sendDebugLog ( "run is finished" , snapshotMetadata ) ;
285
294
286
- // Pretend we've just suspended the run. This will kill the process without failing the run.
287
295
await this . exitTaskRunProcessWithoutFailingRun ( { flush : true } ) ;
288
296
return ;
289
297
}
@@ -434,10 +442,12 @@ export class RunExecution {
434
442
// Create snapshot manager
435
443
this . snapshotManager = new SnapshotManager ( {
436
444
runFriendlyId : runOpts . runFriendlyId ,
445
+ runnerId : this . env . TRIGGER_RUNNER_ID ,
437
446
initialSnapshotId : runOpts . snapshotFriendlyId ,
438
447
// We're just guessing here, but "PENDING_EXECUTING" is probably fine
439
448
initialStatus : "PENDING_EXECUTING" ,
440
449
logger : this . logger ,
450
+ metadataClient : this . metadataClient ,
441
451
onSnapshotChange : this . processSnapshotChange . bind ( this ) ,
442
452
onSuspendable : this . handleSuspendable . bind ( this ) ,
443
453
} ) ;
@@ -835,14 +845,18 @@ export class RunExecution {
835
845
const [ error , overrides ] = await this . metadataClient . getEnvOverrides ( ) ;
836
846
837
847
if ( error ) {
838
- this . sendDebugLog ( "[override] failed to fetch" , { error : error . message } ) ;
848
+ this . sendDebugLog ( "[override] failed to fetch" , {
849
+ reason,
850
+ error : error . message ,
851
+ } ) ;
839
852
return null ;
840
853
}
841
854
842
855
if ( overrides . TRIGGER_RUN_ID && overrides . TRIGGER_RUN_ID !== this . runFriendlyId ) {
843
856
this . sendDebugLog ( "[override] run ID mismatch, ignoring overrides" , {
857
+ reason,
844
858
currentRunId : this . runFriendlyId ,
845
- overrideRunId : overrides . TRIGGER_RUN_ID ,
859
+ incomingRunId : overrides . TRIGGER_RUN_ID ,
846
860
} ) ;
847
861
return null ;
848
862
}
@@ -855,11 +869,14 @@ export class RunExecution {
855
869
let executionWasRestored = false ;
856
870
857
871
if ( this . env . TRIGGER_RUNNER_ID !== overrides . TRIGGER_RUNNER_ID ) {
858
- this . sendDebugLog ( "[override] runner ID changed -> execution was restored" , {
872
+ this . sendDebugLog ( "[override] runner ID mismatch, execution was restored" , {
873
+ reason,
859
874
currentRunnerId : this . env . TRIGGER_RUNNER_ID ,
860
- newRunnerId : overrides . TRIGGER_RUNNER_ID ,
875
+ incomingRunnerId : overrides . TRIGGER_RUNNER_ID ,
861
876
} ) ;
862
877
878
+ // we should keep a list of restored snapshots
879
+
863
880
executionWasRestored = true ;
864
881
}
865
882
@@ -1124,7 +1141,6 @@ export class RunExecution {
1124
1141
} ) ;
1125
1142
1126
1143
await this . processEnvOverrides ( "snapshots since error" ) ;
1127
-
1128
1144
return ;
1129
1145
}
1130
1146
@@ -1135,49 +1151,7 @@ export class RunExecution {
1135
1151
return ;
1136
1152
}
1137
1153
1138
- // Only act on the last snapshot
1139
- const lastSnapshot = snapshots [ snapshots . length - 1 ] ;
1140
-
1141
- if ( ! lastSnapshot ) {
1142
- this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no last snapshot` , { source } ) ;
1143
- return ;
1144
- }
1145
-
1146
- const previousSnapshots = snapshots . slice ( 0 , - 1 ) ;
1147
-
1148
- // If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
1149
- const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1150
- const deprecatedSnapshots = previousSnapshots . filter ( ( snap ) =>
1151
- deprecatedStatus . includes ( snap . snapshot . executionStatus )
1152
- ) ;
1153
-
1154
- if ( deprecatedSnapshots . length ) {
1155
- const result = await this . processEnvOverrides (
1156
- "found deprecation marker in previous snapshots"
1157
- ) ;
1158
-
1159
- if ( ! result ) {
1160
- return ;
1161
- }
1162
-
1163
- const { executionWasRestored } = result ;
1164
-
1165
- if ( executionWasRestored ) {
1166
- // It's normal for a restored run to have deprecation markers, e.g. it will have been SUSPENDED
1167
- } else {
1168
- this . sendDebugLog (
1169
- `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1170
- {
1171
- source,
1172
- deprecatedSnapshots : deprecatedSnapshots . map ( ( s ) => s . snapshot ) ,
1173
- }
1174
- ) ;
1175
- await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1176
- return ;
1177
- }
1178
- }
1179
-
1180
- const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
1154
+ const [ error ] = await tryCatch ( this . enqueueSnapshotChangesAndWait ( snapshots ) ) ;
1181
1155
1182
1156
if ( error ) {
1183
1157
this . sendDebugLog (
0 commit comments