@@ -61,6 +61,7 @@ class ProdWorker {
61
61
62
62
private nextResumeAfter ?: WaitReason ;
63
63
private waitForPostStart = false ;
64
+ private connectionCount = 0 ;
64
65
65
66
private waitForTaskReplay :
66
67
| {
@@ -160,6 +161,7 @@ class ProdWorker {
160
161
this . waitForPostStart = false ;
161
162
162
163
this . #coordinatorSocket. close ( ) ;
164
+ this . connectionCount = 0 ;
163
165
164
166
let coordinatorHost = COORDINATOR_HOST ;
165
167
@@ -187,6 +189,7 @@ class ProdWorker {
187
189
}
188
190
}
189
191
192
+ // MARK: TASK WAIT
190
193
async #waitForTaskHandler( message : OnWaitForTaskMessage , replayIdempotencyKey ?: string ) {
191
194
const waitForTask = await defaultBackoff . execute ( async ( { retry } ) => {
192
195
logger . log ( "Wait for task with backoff" , { retry } ) ;
@@ -223,6 +226,7 @@ class ProdWorker {
223
226
await this . #prepareForWait( "WAIT_FOR_TASK" , willCheckpointAndRestore ) ;
224
227
225
228
if ( willCheckpointAndRestore ) {
229
+ // TODO: where's the timeout?
226
230
// We need to replay this on next connection if we don't receive RESUME_AFTER_DEPENDENCY within a reasonable time
227
231
if ( ! this . waitForTaskReplay ) {
228
232
this . waitForTaskReplay = {
@@ -246,6 +250,7 @@ class ProdWorker {
246
250
}
247
251
}
248
252
253
+ // MARK: BATCH WAIT
249
254
async #waitForBatchHandler( message : OnWaitForBatchMessage , replayIdempotencyKey ?: string ) {
250
255
const waitForBatch = await defaultBackoff . execute ( async ( { retry } ) => {
251
256
logger . log ( "Wait for batch with backoff" , { retry } ) ;
@@ -306,6 +311,7 @@ class ProdWorker {
306
311
}
307
312
}
308
313
314
+ // MARK: WORKER CREATION
309
315
#createBackgroundWorker( ) {
310
316
const backgroundWorker = new ProdBackgroundWorker ( "worker.js" , {
311
317
projectConfig : __PROJECT_CONFIG__ ,
@@ -520,6 +526,7 @@ class ProdWorker {
520
526
await this . #prepareForCheckpoint( ) ;
521
527
}
522
528
529
+ // MARK: RETRY PREP
523
530
async #prepareForRetry(
524
531
willCheckpointAndRestore : boolean ,
525
532
shouldExit : boolean ,
@@ -553,6 +560,7 @@ class ProdWorker {
553
560
await this . #prepareForCheckpoint( false ) ;
554
561
}
555
562
563
+ // MARK: CHECKPOINT PREP
556
564
async #prepareForCheckpoint( flush = true ) {
557
565
if ( flush ) {
558
566
// Flush before checkpointing so we don't flush the same spans again after restore
@@ -664,6 +672,7 @@ class ProdWorker {
664
672
} ) ;
665
673
}
666
674
675
+ // MARK: ATTEMPT COMPLETION
667
676
async #submitAttemptCompletion(
668
677
execution : ProdTaskRunExecution ,
669
678
completion : TaskRunExecutionResult ,
@@ -742,6 +751,7 @@ class ProdWorker {
742
751
return headers ;
743
752
}
744
753
754
+ // MARK: COORDINATOR SOCKET
745
755
#createCoordinatorSocket( host : string ) {
746
756
const extraHeaders = this . #returnValidatedExtraHeaders( {
747
757
"x-machine-name" : MACHINE_NAME ,
@@ -943,8 +953,12 @@ class ProdWorker {
943
953
await this . #readyForLazyAttempt( ) ;
944
954
} ,
945
955
} ,
956
+ // MARK: ON CONNECTION
946
957
onConnection : async ( socket , handler , sender , logger ) => {
947
- logger . log ( "connected to coordinator" , { status : this . #status } ) ;
958
+ logger . log ( "connected to coordinator" , {
959
+ status : this . #status,
960
+ connectionCount : ++ this . connectionCount ,
961
+ } ) ;
948
962
949
963
// We need to send our current state to the coordinator
950
964
socket . emit ( "SET_STATE" , { version : "v1" , attemptFriendlyId : this . attemptFriendlyId } ) ;
@@ -1101,153 +1115,161 @@ class ProdWorker {
1101
1115
} catch ( error ) {
1102
1116
logger . error ( "connection handler error" , { error } ) ;
1103
1117
} finally {
1104
- const backoff = new ExponentialBackoff ( ) . type ( "FullJitter" ) . maxRetries ( 3 ) ;
1105
- const cancellationDelay = 20_000 ;
1118
+ if ( this . connectionCount === 1 ) {
1119
+ // Skip replays if this is the first connection, including post start
1120
+ return ;
1121
+ }
1106
1122
1107
- if ( this . waitForTaskReplay ) {
1108
- logger . log ( "replaying wait for task" , { ...this . waitForTaskReplay } ) ;
1123
+ // This is a reconnect, so handle replays
1124
+ this . #handleReplays( ) ;
1125
+ }
1126
+ } ,
1127
+ onError : async ( socket , err , logger ) => {
1128
+ logger . error ( "onError" , {
1129
+ error : {
1130
+ name : err . name ,
1131
+ message : err . message ,
1132
+ } ,
1133
+ } ) ;
1134
+ } ,
1135
+ } ) ;
1109
1136
1110
- const { idempotencyKey, message, attempt } = this . waitForTaskReplay ;
1137
+ return coordinatorConnection ;
1138
+ }
1111
1139
1112
- // Give the platform some time to send RESUME_AFTER_DEPENDENCY
1113
- await timeout ( cancellationDelay ) ;
1140
+ // MARK: REPLAYS
1141
+ async #handleReplays( ) {
1142
+ const backoff = new ExponentialBackoff ( ) . type ( "FullJitter" ) . maxRetries ( 3 ) ;
1143
+ const replayCancellationDelay = 20_000 ;
1114
1144
1115
- if ( ! this . waitForTaskReplay ) {
1116
- logger . error ( "wait for task replay cancelled, discarding" , {
1117
- originalMessage : { idempotencyKey, message, attempt } ,
1118
- } ) ;
1145
+ if ( this . waitForTaskReplay ) {
1146
+ logger . log ( "replaying wait for task" , { ...this . waitForTaskReplay } ) ;
1119
1147
1120
- return ;
1121
- }
1148
+ const { idempotencyKey, message, attempt } = this . waitForTaskReplay ;
1122
1149
1123
- if ( idempotencyKey !== this . waitForTaskReplay . idempotencyKey ) {
1124
- logger . error ( "wait for task replay idempotency key mismatch, discarding" , {
1125
- originalMessage : { idempotencyKey, message, attempt } ,
1126
- newMessage : this . waitForTaskReplay ,
1127
- } ) ;
1150
+ // Give the platform some time to send RESUME_AFTER_DEPENDENCY
1151
+ await timeout ( replayCancellationDelay ) ;
1128
1152
1129
- return ;
1130
- }
1153
+ if ( ! this . waitForTaskReplay ) {
1154
+ logger . error ( "wait for task replay cancelled, discarding" , {
1155
+ originalMessage : { idempotencyKey, message, attempt } ,
1156
+ } ) ;
1131
1157
1132
- try {
1133
- await backoff . wait ( attempt + 1 ) ;
1158
+ return ;
1159
+ }
1134
1160
1135
- await this . #waitForTaskHandler( message ) ;
1136
- } catch ( error ) {
1137
- if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1138
- logger . error ( "wait for task replay retry limit exceeded" , { error } ) ;
1139
- } else {
1140
- logger . error ( "wait for task replay error" , { error } ) ;
1141
- }
1142
- }
1161
+ if ( idempotencyKey !== this . waitForTaskReplay . idempotencyKey ) {
1162
+ logger . error ( "wait for task replay idempotency key mismatch, discarding" , {
1163
+ originalMessage : { idempotencyKey, message, attempt } ,
1164
+ newMessage : this . waitForTaskReplay ,
1165
+ } ) ;
1143
1166
1144
- return ;
1145
- }
1167
+ return ;
1168
+ }
1146
1169
1147
- if ( this . waitForBatchReplay ) {
1148
- logger . log ( "replaying wait for batch" , {
1149
- ...this . waitForBatchReplay ,
1150
- cancellationDelay,
1151
- } ) ;
1170
+ try {
1171
+ await backoff . wait ( attempt + 1 ) ;
1152
1172
1153
- const { idempotencyKey, message, attempt } = this . waitForBatchReplay ;
1173
+ await this . #waitForTaskHandler( message ) ;
1174
+ } catch ( error ) {
1175
+ if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1176
+ logger . error ( "wait for task replay retry limit exceeded" , { error } ) ;
1177
+ } else {
1178
+ logger . error ( "wait for task replay error" , { error } ) ;
1179
+ }
1180
+ }
1154
1181
1155
- // Give the platform some time to send RESUME_AFTER_DEPENDENCY
1156
- await timeout ( cancellationDelay ) ;
1182
+ return ;
1183
+ }
1157
1184
1158
- if ( ! this . waitForBatchReplay ) {
1159
- logger . error ( "wait for batch replay cancelled, discarding" , {
1160
- originalMessage : { idempotencyKey, message, attempt } ,
1161
- } ) ;
1185
+ if ( this . waitForBatchReplay ) {
1186
+ logger . log ( "replaying wait for batch" , {
1187
+ ...this . waitForBatchReplay ,
1188
+ cancellationDelay : replayCancellationDelay ,
1189
+ } ) ;
1162
1190
1163
- return ;
1164
- }
1191
+ const { idempotencyKey, message, attempt } = this . waitForBatchReplay ;
1165
1192
1166
- if ( idempotencyKey !== this . waitForBatchReplay . idempotencyKey ) {
1167
- logger . error ( "wait for batch replay idempotency key mismatch, discarding" , {
1168
- originalMessage : { idempotencyKey, message, attempt } ,
1169
- newMessage : this . waitForBatchReplay ,
1170
- } ) ;
1193
+ // Give the platform some time to send RESUME_AFTER_DEPENDENCY
1194
+ await timeout ( replayCancellationDelay ) ;
1171
1195
1172
- return ;
1173
- }
1196
+ if ( ! this . waitForBatchReplay ) {
1197
+ logger . error ( "wait for batch replay cancelled, discarding" , {
1198
+ originalMessage : { idempotencyKey, message, attempt } ,
1199
+ } ) ;
1174
1200
1175
- try {
1176
- await backoff . wait ( attempt + 1 ) ;
1201
+ return ;
1202
+ }
1177
1203
1178
- await this . #waitForBatchHandler( message ) ;
1179
- } catch ( error ) {
1180
- if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1181
- logger . error ( "wait for batch replay retry limit exceeded" , { error } ) ;
1182
- } else {
1183
- logger . error ( "wait for batch replay error" , { error } ) ;
1184
- }
1185
- }
1204
+ if ( idempotencyKey !== this . waitForBatchReplay . idempotencyKey ) {
1205
+ logger . error ( "wait for batch replay idempotency key mismatch, discarding" , {
1206
+ originalMessage : { idempotencyKey, message, attempt } ,
1207
+ newMessage : this . waitForBatchReplay ,
1208
+ } ) ;
1186
1209
1187
- return ;
1188
- }
1210
+ return ;
1211
+ }
1189
1212
1190
- if ( this . submitAttemptCompletionReplay ) {
1191
- logger . log ( "replaying attempt completion" , {
1192
- ...this . submitAttemptCompletionReplay ,
1193
- cancellationDelay,
1194
- } ) ;
1213
+ try {
1214
+ await backoff . wait ( attempt + 1 ) ;
1195
1215
1196
- const { idempotencyKey, message, attempt } = this . submitAttemptCompletionReplay ;
1216
+ await this . #waitForBatchHandler( message ) ;
1217
+ } catch ( error ) {
1218
+ if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1219
+ logger . error ( "wait for batch replay retry limit exceeded" , { error } ) ;
1220
+ } else {
1221
+ logger . error ( "wait for batch replay error" , { error } ) ;
1222
+ }
1223
+ }
1197
1224
1198
- // Give the platform some time to send READY_FOR_RETRY
1199
- await timeout ( cancellationDelay ) ;
1225
+ return ;
1226
+ }
1200
1227
1201
- if ( ! this . submitAttemptCompletionReplay ) {
1202
- logger . error ( "attempt completion replay cancelled, discarding" , {
1203
- originalMessage : { idempotencyKey, message, attempt } ,
1204
- } ) ;
1228
+ if ( this . submitAttemptCompletionReplay ) {
1229
+ logger . log ( "replaying attempt completion" , {
1230
+ ...this . submitAttemptCompletionReplay ,
1231
+ cancellationDelay : replayCancellationDelay ,
1232
+ } ) ;
1205
1233
1206
- return ;
1207
- }
1234
+ const { idempotencyKey, message, attempt } = this . submitAttemptCompletionReplay ;
1208
1235
1209
- if ( idempotencyKey !== this . submitAttemptCompletionReplay . idempotencyKey ) {
1210
- logger . error ( "attempt completion replay idempotency key mismatch, discarding" , {
1211
- originalMessage : { idempotencyKey, message, attempt } ,
1212
- newMessage : this . submitAttemptCompletionReplay ,
1213
- } ) ;
1236
+ // Give the platform some time to send READY_FOR_RETRY
1237
+ await timeout ( replayCancellationDelay ) ;
1214
1238
1215
- return ;
1216
- }
1239
+ if ( ! this . submitAttemptCompletionReplay ) {
1240
+ logger . error ( "attempt completion replay cancelled, discarding" , {
1241
+ originalMessage : { idempotencyKey, message, attempt } ,
1242
+ } ) ;
1217
1243
1218
- try {
1219
- await backoff . wait ( attempt + 1 ) ;
1244
+ return ;
1245
+ }
1220
1246
1221
- await this . #submitAttemptCompletion(
1222
- message . execution ,
1223
- message . completion ,
1224
- idempotencyKey
1225
- ) ;
1226
- } catch ( error ) {
1227
- if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1228
- logger . error ( "attempt completion replay retry limit exceeded" , { error } ) ;
1229
- } else {
1230
- logger . error ( "attempt completion replay error" , { error } ) ;
1231
- }
1232
- }
1247
+ if ( idempotencyKey !== this . submitAttemptCompletionReplay . idempotencyKey ) {
1248
+ logger . error ( "attempt completion replay idempotency key mismatch, discarding" , {
1249
+ originalMessage : { idempotencyKey, message, attempt } ,
1250
+ newMessage : this . submitAttemptCompletionReplay ,
1251
+ } ) ;
1233
1252
1234
- return ;
1235
- }
1253
+ return ;
1254
+ }
1255
+
1256
+ try {
1257
+ await backoff . wait ( attempt + 1 ) ;
1258
+
1259
+ await this . #submitAttemptCompletion( message . execution , message . completion , idempotencyKey ) ;
1260
+ } catch ( error ) {
1261
+ if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1262
+ logger . error ( "attempt completion replay retry limit exceeded" , { error } ) ;
1263
+ } else {
1264
+ logger . error ( "attempt completion replay error" , { error } ) ;
1236
1265
}
1237
- } ,
1238
- onError : async ( socket , err , logger ) => {
1239
- logger . error ( "onError" , {
1240
- error : {
1241
- name : err . name ,
1242
- message : err . message ,
1243
- } ,
1244
- } ) ;
1245
- } ,
1246
- } ) ;
1266
+ }
1247
1267
1248
- return coordinatorConnection ;
1268
+ return ;
1269
+ }
1249
1270
}
1250
1271
1272
+ // MARK: HTTP SERVER
1251
1273
#createHttpServer( ) {
1252
1274
const httpServer = createServer ( async ( req , res ) => {
1253
1275
logger . log ( `[${ req . method } ]` , req . url ) ;
@@ -1273,6 +1295,7 @@ class ProdWorker {
1273
1295
1274
1296
case "/close" : {
1275
1297
this . #coordinatorSocket. close ( ) ;
1298
+ this . connectionCount = 0 ;
1276
1299
1277
1300
return reply . text ( "Disconnected from coordinator" ) ;
1278
1301
}
0 commit comments