@@ -17,7 +17,7 @@ import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
17
17
import { setTimeout as sleep } from "timers/promises" ;
18
18
import { RunExecutionSnapshotPoller } from "./poller.js" ;
19
19
import { assertExhaustive , tryCatch } from "@trigger.dev/core/utils" ;
20
- import { MetadataClient } from "./overrides.js" ;
20
+ import { Metadata , MetadataClient } from "./overrides.js" ;
21
21
import { randomBytes } from "node:crypto" ;
22
22
import { SnapshotManager , SnapshotState } from "./snapshot.js" ;
23
23
import type { SupervisorSocket } from "./controller.js" ;
@@ -76,6 +76,7 @@ export class RunExecution {
76
76
77
77
private supervisorSocket : SupervisorSocket ;
78
78
private notifier ?: RunNotifier ;
79
+ private metadataClient ?: MetadataClient ;
79
80
80
81
constructor ( opts : RunExecutionOptions ) {
81
82
this . id = randomBytes ( 4 ) . toString ( "hex" ) ;
@@ -87,6 +88,10 @@ export class RunExecution {
87
88
88
89
this . restoreCount = 0 ;
89
90
this . executionAbortController = new AbortController ( ) ;
91
+
92
+ if ( this . env . TRIGGER_METADATA_URL ) {
93
+ this . metadataClient = new MetadataClient ( this . env . TRIGGER_METADATA_URL ) ;
94
+ }
90
95
}
91
96
92
97
/**
@@ -820,30 +825,42 @@ export class RunExecution {
820
825
/**
821
826
* Processes env overrides from the metadata service. Generally called when we're resuming from a suspended state.
822
827
*/
823
- public async processEnvOverrides ( reason ?: string ) {
824
- if ( ! this . env . TRIGGER_METADATA_URL ) {
825
- this . sendDebugLog ( "no metadata url, skipping env overrides" , { reason } ) ;
826
- return ;
828
+ public async processEnvOverrides (
829
+ reason ?: string
830
+ ) : Promise < { executionWasRestored : boolean ; overrides : Metadata } | null > {
831
+ if ( ! this . metadataClient ) {
832
+ return null ;
827
833
}
828
834
829
- const metadataClient = new MetadataClient ( this . env . TRIGGER_METADATA_URL ) ;
830
- const overrides = await metadataClient . getEnvOverrides ( ) ;
835
+ const [ error , overrides ] = await this . metadataClient . getEnvOverrides ( ) ;
831
836
832
- if ( ! overrides ) {
833
- this . sendDebugLog ( "no env overrides, skipping" , { reason } ) ;
834
- return ;
837
+ if ( error ) {
838
+ this . sendDebugLog ( "[override] failed to fetch" , { error : error . message } ) ;
839
+ return null ;
840
+ }
841
+
842
+ if ( overrides . TRIGGER_RUN_ID && overrides . TRIGGER_RUN_ID !== this . runFriendlyId ) {
843
+ this . sendDebugLog ( "[override] run ID mismatch, ignoring overrides" , {
844
+ currentRunId : this . runFriendlyId ,
845
+ overrideRunId : overrides . TRIGGER_RUN_ID ,
846
+ } ) ;
847
+ return null ;
835
848
}
836
849
837
- this . sendDebugLog ( `processing env overrides : ${ reason } ` , {
850
+ this . sendDebugLog ( `[override] processing : ${ reason } ` , {
838
851
overrides,
839
852
currentEnv : this . env . raw ,
840
853
} ) ;
841
854
855
+ let executionWasRestored = false ;
856
+
842
857
if ( this . env . TRIGGER_RUNNER_ID !== overrides . TRIGGER_RUNNER_ID ) {
843
- this . sendDebugLog ( "runner ID changed -> run was restored from a checkpoint " , {
858
+ this . sendDebugLog ( "[override] runner ID changed -> execution was restored" , {
844
859
currentRunnerId : this . env . TRIGGER_RUNNER_ID ,
845
860
newRunnerId : overrides . TRIGGER_RUNNER_ID ,
846
861
} ) ;
862
+
863
+ executionWasRestored = true ;
847
864
}
848
865
849
866
// Override the env with the new values
@@ -863,6 +880,11 @@ export class RunExecution {
863
880
if ( overrides . TRIGGER_RUNNER_ID ) {
864
881
this . httpClient . updateRunnerId ( this . env . TRIGGER_RUNNER_ID ) ;
865
882
}
883
+
884
+ return {
885
+ executionWasRestored,
886
+ overrides,
887
+ } ;
866
888
}
867
889
868
890
private async onHeartbeat ( ) {
@@ -1125,21 +1147,34 @@ export class RunExecution {
1125
1147
1126
1148
// If any previous snapshot is QUEUED or SUSPENDED, deprecate this worker
1127
1149
const deprecatedStatus : TaskRunExecutionStatus [ ] = [ "QUEUED" , "SUSPENDED" ] ;
1128
- const foundDeprecated = previousSnapshots . find ( ( snap ) =>
1150
+ const deprecatedSnapshots = previousSnapshots . filter ( ( snap ) =>
1129
1151
deprecatedStatus . includes ( snap . snapshot . executionStatus )
1130
1152
) ;
1131
1153
1132
- if ( foundDeprecated ) {
1133
- this . sendDebugLog (
1134
- `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1135
- {
1136
- source,
1137
- status : foundDeprecated . snapshot . executionStatus ,
1138
- snapshotId : foundDeprecated . snapshot . friendlyId ,
1139
- }
1154
+ if ( deprecatedSnapshots . length ) {
1155
+ const result = await this . processEnvOverrides (
1156
+ "found deprecation marker in previous snapshots"
1140
1157
) ;
1141
- await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1142
- return ;
1158
+
1159
+ if ( ! result ) {
1160
+ return ;
1161
+ }
1162
+
1163
+ const { executionWasRestored } = result ;
1164
+
1165
+ if ( executionWasRestored ) {
1166
+ // It's normal for a restored run to have deprecation markers, e.g. it will have been SUSPENDED
1167
+ } else {
1168
+ this . sendDebugLog (
1169
+ `fetchAndProcessSnapshotChanges: found deprecation marker in previous snapshots, exiting` ,
1170
+ {
1171
+ source,
1172
+ deprecatedSnapshots : deprecatedSnapshots . map ( ( s ) => s . snapshot ) ,
1173
+ }
1174
+ ) ;
1175
+ await this . exitTaskRunProcessWithoutFailingRun ( { flush : false } ) ;
1176
+ return ;
1177
+ }
1143
1178
}
1144
1179
1145
1180
const [ error ] = await tryCatch ( this . enqueueSnapshotChangeAndWait ( lastSnapshot ) ) ;
0 commit comments