@@ -448,13 +448,13 @@ export class RunExecution {
448
448
snapshotFriendlyId : this . snapshotManager . snapshotId ,
449
449
logger : this . logger ,
450
450
snapshotPollIntervalSeconds : this . env . TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS ,
451
- onPoll : this . fetchAndEnqueueSnapshotChange . bind ( this ) ,
451
+ onPoll : this . fetchAndProcessSnapshotChanges . bind ( this ) ,
452
452
} ) . start ( ) ;
453
453
454
454
this . notifier = new RunNotifier ( {
455
455
runFriendlyId : this . runFriendlyId ,
456
456
supervisorSocket : this . supervisorSocket ,
457
- onNotify : this . fetchAndEnqueueSnapshotChange . bind ( this ) ,
457
+ onNotify : this . fetchAndProcessSnapshotChanges . bind ( this ) ,
458
458
logger : this . logger ,
459
459
} ) . start ( ) ;
460
460
@@ -1081,29 +1081,71 @@ export class RunExecution {
1081
1081
* Fetches the latest execution data and enqueues snapshot changes. Used by both poller and notification handlers.
1082
1082
* @param source string - where this call originated (e.g. 'poller', 'notification')
1083
1083
*/
1084
- public async fetchAndEnqueueSnapshotChange ( source : string ) : Promise < void > {
1084
+ public async fetchAndProcessSnapshotChanges ( source : string ) : Promise < void > {
1085
1085
if ( ! this . runFriendlyId ) {
1086
- this . sendDebugLog ( `fetchAndEnqueueSnapshotChange : missing runFriendlyId` , { source } ) ;
1086
+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges : missing runFriendlyId` , { source } ) ;
1087
1087
return ;
1088
1088
}
1089
1089
1090
- const latestSnapshot = await this . httpClient . getRunExecutionData ( this . runFriendlyId ) ;
1090
+ // Use the last processed snapshot as the since parameter
1091
+ const sinceSnapshotId = this . currentSnapshotFriendlyId ;
1091
1092
1092
- if ( ! latestSnapshot . success ) {
1093
- this . sendDebugLog ( `fetchAndEnqueueSnapshotChange: failed to get latest snapshot data` , {
1093
+ if ( ! sinceSnapshotId ) {
1094
+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: missing sinceSnapshotId` , { source } ) ;
1095
+ return ;
1096
+ }
1097
+
1098
+ const response = await this . httpClient . getSnapshotsSince ( this . runFriendlyId , sinceSnapshotId ) ;
1099
+
1100
+ if ( ! response . success ) {
1101
+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: failed to get snapshots since` , {
1094
1102
source,
1095
- error : latestSnapshot . error ,
1103
+ error : response . error ,
1096
1104
} ) ;
1097
1105
return ;
1098
1106
}
1099
1107
1100
- const [ error ] = await tryCatch (
1101
- this . enqueueSnapshotChangeAndWait ( latestSnapshot . data . execution )
1108
+ const { executions } = response . data ;
1109
+
1110
+ if ( ! executions . length ) {
1111
+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no new snapshots` , { source } ) ;
1112
+ return ;
1113
+ }
1114
+
1115
+ // Only act on the last snapshot
1116
+ const lastSnapshot = executions [ executions . length - 1 ] ;
1117
+
1118
+ if ( ! lastSnapshot ) {
1119
+ this . sendDebugLog ( `fetchAndProcessSnapshotChanges: no last snapshot` , { source } ) ;
1120
+ return ;
1121
+ }
1122
+
1123
+ const previousSnapshots = executions . slice ( 0 , - 1 ) ;
1124
+
1125
+ // If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
1126
+ const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1127
+ const foundDeprecated = previousSnapshots . find ( ( snap ) =>
1128
+ deprecatedStatus . includes ( snap . snapshot . executionStatus )
1102
1129
) ;
1103
1130
1131
+ if ( foundDeprecated ) {
1132
+ this . sendDebugLog (
1133
+ `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1134
+ {
1135
+ source,
1136
+ status : foundDeprecated . snapshot . executionStatus ,
1137
+ snapshotId : foundDeprecated . snapshot . friendlyId ,
1138
+ }
1139
+ ) ;
1140
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1141
+ return ;
1142
+ }
1143
+
1144
+ const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
1145
+
1104
1146
if ( error ) {
1105
1147
this . sendDebugLog (
1106
- `fetchAndEnqueueSnapshotChange : failed to enqueue and process snapshot change` ,
1148
+ `fetchAndProcessSnapshotChanges : failed to enqueue and process snapshot change` ,
1107
1149
{
1108
1150
source,
1109
1151
error : error . message ,
0 commit comments