@@ -12,7 +12,7 @@ import {
12
12
} from "@trigger.dev/core/v3" ;
13
13
import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace" ;
14
14
import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket" ;
15
- import { HttpReply , getTextBody , SimpleLogger } from "@trigger.dev/core-apps" ;
15
+ import { HttpReply , getTextBody , SimpleLogger , testDockerCheckpoint } from "@trigger.dev/core-apps" ;
16
16
import { ExponentialBackoff } from "./backoff" ;
17
17
18
18
import { collectDefaultMetrics , register , Gauge } from "prom-client" ;
@@ -72,7 +72,10 @@ type CheckpointAndPushOptions = {
72
72
73
73
type CheckpointAndPushResult =
74
74
| { success : true ; checkpoint : CheckpointData }
75
- | { success : false ; reason ?: "CANCELED" | "DISABLED" | "ERROR" | "IN_PROGRESS" | "NO_SUPPORT" } ;
75
+ | {
76
+ success : false ;
77
+ reason ?: "CANCELED" | "DISABLED" | "ERROR" | "IN_PROGRESS" | "NO_SUPPORT" | "SKIP_RETRYING" ;
78
+ } ;
76
79
77
80
type CheckpointData = {
78
81
location : string ;
@@ -125,65 +128,53 @@ class Checkpointer {
125
128
126
129
constructor ( private opts = { forceSimulate : false } ) { }
127
130
128
- async initialize ( ) : Promise < CheckpointerInitializeReturn > {
131
+ async init ( ) : Promise < CheckpointerInitializeReturn > {
129
132
if ( this . #initialized) {
130
- return this . #getInitializeReturn ( ) ;
133
+ return this . #getInitReturn ( this . #canCheckpoint ) ;
131
134
}
132
135
133
136
this . #logger. log ( `${ this . #dockerMode ? "Docker" : "Kubernetes" } mode` ) ;
134
137
135
138
if ( this . #dockerMode) {
136
- try {
137
- await $ `criu --version` ;
138
- } catch ( error ) {
139
- this . #logger. error ( "No checkpoint support: Missing CRIU binary" ) ;
140
- this . #logger. error ( "Will simulate instead" ) ;
141
- this . #canCheckpoint = false ;
142
- this . #initialized = true ;
139
+ const testCheckpoint = await testDockerCheckpoint ( ) ;
143
140
144
- return this . #getInitializeReturn( ) ;
141
+ if ( testCheckpoint . ok ) {
142
+ return this . #getInitReturn( true ) ;
145
143
}
146
144
147
- try {
148
- await $ `docker checkpoint` ;
149
- } catch ( error ) {
150
- this . #logger. error (
151
- "No checkpoint support: Docker needs to have experimental features enabled"
152
- ) ;
153
- this . #logger. error ( "Will simulate instead" ) ;
154
- this . #canCheckpoint = false ;
155
- this . #initialized = true ;
156
-
157
- return this . #getInitializeReturn( ) ;
158
- }
145
+ this . #logger. error ( testCheckpoint . message , testCheckpoint . error ?? "" ) ;
146
+ return this . #getInitReturn( false ) ;
159
147
} else {
160
148
try {
161
149
await $ `buildah login --get-login ${ REGISTRY_HOST } ` ;
162
150
} catch ( error ) {
163
151
this . #logger. error ( `No checkpoint support: Not logged in to registry ${ REGISTRY_HOST } ` ) ;
164
- this . #canCheckpoint = false ;
165
- this . #initialized = true ;
166
-
167
- return this . #getInitializeReturn( ) ;
152
+ return this . #getInitReturn( false ) ;
168
153
}
169
154
}
170
155
171
- this . #logger. log (
172
- `Full checkpoint support${
173
- this . #dockerMode && this . opts . forceSimulate ? " with forced simulation enabled." : "!"
174
- } `
175
- ) ;
156
+ return this . #getInitReturn( true ) ;
157
+ }
176
158
159
+ #getInitReturn( canCheckpoint : boolean ) : CheckpointerInitializeReturn {
177
160
this . #initialized = true ;
178
- this . #canCheckpoint = true ;
161
+ this . #canCheckpoint = canCheckpoint ;
179
162
180
- return this . #getInitializeReturn( ) ;
181
- }
163
+ if ( canCheckpoint ) {
164
+ this . #logger. log ( "Full checkpoint support!" ) ;
165
+ }
166
+
167
+ const willSimulate = this . #dockerMode && ( ! this . #canCheckpoint || this . opts . forceSimulate ) ;
168
+
169
+ if ( willSimulate ) {
170
+ this . #logger. log ( "Simulation mode enabled. Containers will be paused, not checkpointed." , {
171
+ forceSimulate : this . opts . forceSimulate ,
172
+ } ) ;
173
+ }
182
174
183
- #getInitializeReturn( ) : CheckpointerInitializeReturn {
184
175
return {
185
- canCheckpoint : this . #canCheckpoint ,
186
- willSimulate : this . #dockerMode && ( ! this . #canCheckpoint || this . opts . forceSimulate ) ,
176
+ canCheckpoint,
177
+ willSimulate,
187
178
} ;
188
179
}
189
180
@@ -327,6 +318,11 @@ class Checkpointer {
327
318
return result ;
328
319
}
329
320
321
+ if ( result . reason === "SKIP_RETRYING" ) {
322
+ this . #logger. log ( "Skipping retrying" , { runId } ) ;
323
+ return result ;
324
+ }
325
+
330
326
continue ;
331
327
} catch ( error ) {
332
328
this . #logger. error ( "Checkpoint error" , {
@@ -355,7 +351,7 @@ class Checkpointer {
355
351
projectRef,
356
352
deploymentVersion,
357
353
} : CheckpointAndPushOptions ) : Promise < CheckpointAndPushResult > {
358
- await this . initialize ( ) ;
354
+ await this . init ( ) ;
359
355
360
356
const options = {
361
357
runId,
@@ -473,7 +469,8 @@ class Checkpointer {
473
469
474
470
// Create checkpoint (CRI)
475
471
if ( ! this . #canCheckpoint) {
476
- throw new Error ( "No checkpoint support in kubernetes mode." ) ;
472
+ this . #logger. error ( "No checkpoint support in kubernetes mode." ) ;
473
+ return { success : false , reason : "SKIP_RETRYING" } ;
477
474
}
478
475
479
476
const containerId = this . #logger. debug (
@@ -484,7 +481,8 @@ class Checkpointer {
484
481
) ;
485
482
486
483
if ( ! containerId . stdout ) {
487
- throw new Error ( "could not find container id" ) ;
484
+ this . #logger. error ( "could not find container id" , { options, containterName } ) ;
485
+ return { success : false , reason : "SKIP_RETRYING" } ;
488
486
}
489
487
490
488
const start = performance . now ( ) ;
@@ -617,7 +615,7 @@ class TaskCoordinator {
617
615
private host = "0.0.0.0"
618
616
) {
619
617
this . #httpServer = this . #createHttpServer( ) ;
620
- this . #checkpointer. initialize ( ) ;
618
+ this . #checkpointer. init ( ) ;
621
619
this . #delayThresholdInMs = this . #getDelayThreshold( ) ;
622
620
623
621
if ( process . env . DELAY_THRESHOLD_IN_MS ) {
@@ -1034,7 +1032,7 @@ class TaskCoordinator {
1034
1032
return ;
1035
1033
}
1036
1034
1037
- const { canCheckpoint, willSimulate } = await this . #checkpointer. initialize ( ) ;
1035
+ const { canCheckpoint, willSimulate } = await this . #checkpointer. init ( ) ;
1038
1036
1039
1037
const willCheckpointAndRestore = canCheckpoint || willSimulate ;
1040
1038
@@ -1131,7 +1129,7 @@ class TaskCoordinator {
1131
1129
return ;
1132
1130
}
1133
1131
1134
- const { canCheckpoint, willSimulate } = await this . #checkpointer. initialize ( ) ;
1132
+ const { canCheckpoint, willSimulate } = await this . #checkpointer. init ( ) ;
1135
1133
1136
1134
const willCheckpointAndRestore = canCheckpoint || willSimulate ;
1137
1135
@@ -1185,7 +1183,7 @@ class TaskCoordinator {
1185
1183
socket . on ( "WAIT_FOR_TASK" , async ( message , callback ) => {
1186
1184
logger . log ( "[WAIT_FOR_TASK]" , message ) ;
1187
1185
1188
- const { canCheckpoint, willSimulate } = await this . #checkpointer. initialize ( ) ;
1186
+ const { canCheckpoint, willSimulate } = await this . #checkpointer. init ( ) ;
1189
1187
1190
1188
const willCheckpointAndRestore = canCheckpoint || willSimulate ;
1191
1189
@@ -1227,7 +1225,7 @@ class TaskCoordinator {
1227
1225
socket . on ( "WAIT_FOR_BATCH" , async ( message , callback ) => {
1228
1226
logger . log ( "[WAIT_FOR_BATCH]" , message ) ;
1229
1227
1230
- const { canCheckpoint, willSimulate } = await this . #checkpointer. initialize ( ) ;
1228
+ const { canCheckpoint, willSimulate } = await this . #checkpointer. init ( ) ;
1231
1229
1232
1230
const willCheckpointAndRestore = canCheckpoint || willSimulate ;
1233
1231
0 commit comments