@@ -190,128 +190,156 @@ class ProdWorker {
190
190
}
191
191
192
192
// MARK: TASK WAIT
193
- async #waitForTaskHandler ( message : OnWaitForTaskMessage , replayIdempotencyKey ?: string ) {
194
- const waitForTask = await defaultBackoff . execute ( async ( { retry } ) => {
195
- logger . log ( "Wait for task with backoff " , { retry } ) ;
193
+ #waitForTaskHandlerFactory ( workerId ?: string ) {
194
+ return async ( message : OnWaitForTaskMessage , replayIdempotencyKey ?: string ) => {
195
+ logger . log ( "onWaitForTask " , { workerId , message } ) ;
196
196
197
- if ( ! this . attemptFriendlyId ) {
198
- logger . error ( "Failed to send wait message, attempt friendly ID not set" , { message } ) ;
197
+ if ( this . nextResumeAfter ) {
198
+ logger . error ( "Already waiting for resume, skipping wait for task" , {
199
+ nextResumeAfter : this . nextResumeAfter ,
200
+ } ) ;
199
201
200
- throw new ExponentialBackoff . StopRetrying ( "No attempt ID" ) ;
202
+ return ;
201
203
}
202
204
203
- return await this . #coordinatorSocket. socket . timeout ( 20_000 ) . emitWithAck ( "WAIT_FOR_TASK" , {
204
- version : "v2" ,
205
- friendlyId : message . friendlyId ,
206
- attemptFriendlyId : this . attemptFriendlyId ,
207
- } ) ;
208
- } ) ;
205
+ const waitForTask = await defaultBackoff . execute ( async ( { retry } ) => {
206
+ logger . log ( "Wait for task with backoff" , { retry } ) ;
207
+
208
+ if ( ! this . attemptFriendlyId ) {
209
+ logger . error ( "Failed to send wait message, attempt friendly ID not set" , { message } ) ;
210
+
211
+ throw new ExponentialBackoff . StopRetrying ( "No attempt ID" ) ;
212
+ }
209
213
210
- if ( ! waitForTask . success ) {
211
- logger . error ( "Failed to wait for task with backoff" , {
212
- cause : waitForTask . cause ,
213
- error : waitForTask . error ,
214
+ return await this . #coordinatorSocket. socket . timeout ( 20_000 ) . emitWithAck ( "WAIT_FOR_TASK" , {
215
+ version : "v2" ,
216
+ friendlyId : message . friendlyId ,
217
+ attemptFriendlyId : this . attemptFriendlyId ,
218
+ } ) ;
214
219
} ) ;
215
220
216
- this . #emitUnrecoverableError(
217
- "WaitForTaskFailed" ,
218
- `${ waitForTask . cause } : ${ waitForTask . error } `
219
- ) ;
221
+ if ( ! waitForTask . success ) {
222
+ logger . error ( "Failed to wait for task with backoff" , {
223
+ cause : waitForTask . cause ,
224
+ error : waitForTask . error ,
225
+ } ) ;
220
226
221
- return ;
222
- }
227
+ this . #emitUnrecoverableError(
228
+ "WaitForTaskFailed" ,
229
+ `${ waitForTask . cause } : ${ waitForTask . error } `
230
+ ) ;
223
231
224
- const { willCheckpointAndRestore } = waitForTask . result ;
232
+ return ;
233
+ }
225
234
226
- await this . #prepareForWait ( "WAIT_FOR_TASK" , willCheckpointAndRestore ) ;
235
+ const { willCheckpointAndRestore } = waitForTask . result ;
227
236
228
- if ( willCheckpointAndRestore ) {
229
- // We need to replay this on next connection if we don't receive RESUME_AFTER_DEPENDENCY within a reasonable time
230
- if ( ! this . waitForTaskReplay ) {
231
- this . waitForTaskReplay = {
232
- message,
233
- attempt : 1 ,
234
- idempotencyKey : randomUUID ( ) ,
235
- } ;
236
- } else {
237
- if (
238
- replayIdempotencyKey &&
239
- replayIdempotencyKey !== this . waitForTaskReplay . idempotencyKey
240
- ) {
241
- logger . error (
242
- "wait for task handler called with mismatched idempotency key, won't overwrite replay request"
243
- ) ;
244
- return ;
245
- }
237
+ await this . #prepareForWait( "WAIT_FOR_TASK" , willCheckpointAndRestore ) ;
246
238
247
- this . waitForTaskReplay . attempt ++ ;
239
+ if ( willCheckpointAndRestore ) {
240
+ // We need to replay this on next connection if we don't receive RESUME_AFTER_DEPENDENCY within a reasonable time
241
+ if ( ! this . waitForTaskReplay ) {
242
+ this . waitForTaskReplay = {
243
+ message,
244
+ attempt : 1 ,
245
+ idempotencyKey : randomUUID ( ) ,
246
+ } ;
247
+ } else {
248
+ if (
249
+ replayIdempotencyKey &&
250
+ replayIdempotencyKey !== this . waitForTaskReplay . idempotencyKey
251
+ ) {
252
+ logger . error (
253
+ "wait for task handler called with mismatched idempotency key, won't overwrite replay request"
254
+ ) ;
255
+ return ;
256
+ }
257
+
258
+ this . waitForTaskReplay . attempt ++ ;
259
+ }
248
260
}
249
- }
261
+ } ;
250
262
}
251
263
252
264
// MARK: BATCH WAIT
253
- async #waitForBatchHandler ( message : OnWaitForBatchMessage , replayIdempotencyKey ?: string ) {
254
- const waitForBatch = await defaultBackoff . execute ( async ( { retry } ) => {
255
- logger . log ( "Wait for batch with backoff " , { retry } ) ;
265
+ #waitForBatchHandlerFactory ( workerId ?: string ) {
266
+ return async ( message : OnWaitForBatchMessage , replayIdempotencyKey ?: string ) => {
267
+ logger . log ( "onWaitForBatch " , { workerId , message } ) ;
256
268
257
- if ( ! this . attemptFriendlyId ) {
258
- logger . error ( "Failed to send wait message, attempt friendly ID not set" , { message } ) ;
269
+ if ( this . nextResumeAfter ) {
270
+ logger . error ( "Already waiting for resume, skipping wait for batch" , {
271
+ nextResumeAfter : this . nextResumeAfter ,
272
+ } ) ;
259
273
260
- throw new ExponentialBackoff . StopRetrying ( "No attempt ID" ) ;
274
+ return ;
261
275
}
262
276
263
- return await this . #coordinatorSocket. socket . timeout ( 20_000 ) . emitWithAck ( "WAIT_FOR_BATCH" , {
264
- version : "v2" ,
265
- batchFriendlyId : message . batchFriendlyId ,
266
- runFriendlyIds : message . runFriendlyIds ,
267
- attemptFriendlyId : this . attemptFriendlyId ,
268
- } ) ;
269
- } ) ;
277
+ const waitForBatch = await defaultBackoff . execute ( async ( { retry } ) => {
278
+ logger . log ( "Wait for batch with backoff" , { retry } ) ;
270
279
271
- if ( ! waitForBatch . success ) {
272
- logger . error ( "Failed to wait for batch with backoff" , {
273
- cause : waitForBatch . cause ,
274
- error : waitForBatch . error ,
280
+ if ( ! this . attemptFriendlyId ) {
281
+ logger . error ( "Failed to send wait message, attempt friendly ID not set" , { message } ) ;
282
+
283
+ throw new ExponentialBackoff . StopRetrying ( "No attempt ID" ) ;
284
+ }
285
+
286
+ return await this . #coordinatorSocket. socket . timeout ( 20_000 ) . emitWithAck ( "WAIT_FOR_BATCH" , {
287
+ version : "v2" ,
288
+ batchFriendlyId : message . batchFriendlyId ,
289
+ runFriendlyIds : message . runFriendlyIds ,
290
+ attemptFriendlyId : this . attemptFriendlyId ,
291
+ } ) ;
275
292
} ) ;
276
293
277
- this . #emitUnrecoverableError(
278
- "WaitForBatchFailed" ,
279
- `${ waitForBatch . cause } : ${ waitForBatch . error } `
280
- ) ;
294
+ if ( ! waitForBatch . success ) {
295
+ logger . error ( "Failed to wait for batch with backoff" , {
296
+ cause : waitForBatch . cause ,
297
+ error : waitForBatch . error ,
298
+ } ) ;
281
299
282
- return ;
283
- }
300
+ this . #emitUnrecoverableError(
301
+ "WaitForBatchFailed" ,
302
+ `${ waitForBatch . cause } : ${ waitForBatch . error } `
303
+ ) ;
284
304
285
- const { willCheckpointAndRestore } = waitForBatch . result ;
305
+ return ;
306
+ }
286
307
287
- await this . #prepareForWait ( "WAIT_FOR_BATCH" , willCheckpointAndRestore ) ;
308
+ const { willCheckpointAndRestore } = waitForBatch . result ;
288
309
289
- if ( willCheckpointAndRestore ) {
290
- // We need to replay this on next connection if we don't receive RESUME_AFTER_DEPENDENCY within a reasonable time
291
- if ( ! this . waitForBatchReplay ) {
292
- this . waitForBatchReplay = {
293
- message,
294
- attempt : 1 ,
295
- idempotencyKey : randomUUID ( ) ,
296
- } ;
297
- } else {
298
- if (
299
- replayIdempotencyKey &&
300
- replayIdempotencyKey !== this . waitForBatchReplay . idempotencyKey
301
- ) {
302
- logger . error (
303
- "wait for task handler called with mismatched idempotency key, won't overwrite replay request"
304
- ) ;
305
- return ;
306
- }
310
+ await this . #prepareForWait( "WAIT_FOR_BATCH" , willCheckpointAndRestore ) ;
307
311
308
- this . waitForBatchReplay . attempt ++ ;
312
+ if ( willCheckpointAndRestore ) {
313
+ // We need to replay this on next connection if we don't receive RESUME_AFTER_DEPENDENCY within a reasonable time
314
+ if ( ! this . waitForBatchReplay ) {
315
+ this . waitForBatchReplay = {
316
+ message,
317
+ attempt : 1 ,
318
+ idempotencyKey : randomUUID ( ) ,
319
+ } ;
320
+ } else {
321
+ if (
322
+ replayIdempotencyKey &&
323
+ replayIdempotencyKey !== this . waitForBatchReplay . idempotencyKey
324
+ ) {
325
+ logger . error (
326
+ "wait for task handler called with mismatched idempotency key, won't overwrite replay request"
327
+ ) ;
328
+ return ;
329
+ }
330
+
331
+ this . waitForBatchReplay . attempt ++ ;
332
+ }
309
333
}
310
- }
334
+ } ;
311
335
}
312
336
313
337
// MARK: WORKER CREATION
314
338
#createBackgroundWorker( ) {
339
+ const workerId = randomUUID ( ) ;
340
+
341
+ logger . log ( "Creating background worker" , { workerId } ) ;
342
+
315
343
const backgroundWorker = new ProdBackgroundWorker ( "worker.js" , {
316
344
projectConfig : __PROJECT_CONFIG__ ,
317
345
env : {
@@ -325,7 +353,10 @@ class ProdWorker {
325
353
} ) ;
326
354
327
355
backgroundWorker . onTaskHeartbeat . attach ( ( attemptFriendlyId ) => {
328
- logger . log ( "onTaskHeartbeat" , { attemptFriendlyId } ) ;
356
+ logger . log ( "onTaskHeartbeat" , {
357
+ workerId,
358
+ attemptFriendlyId,
359
+ } ) ;
329
360
330
361
this . #coordinatorSocket. socket . volatile . emit ( "TASK_HEARTBEAT" , {
331
362
version : "v1" ,
@@ -334,13 +365,19 @@ class ProdWorker {
334
365
} ) ;
335
366
336
367
backgroundWorker . onTaskRunHeartbeat . attach ( ( runId ) => {
337
- logger . log ( "onTaskRunHeartbeat" , { runId } ) ;
368
+ logger . log ( "onTaskRunHeartbeat" , {
369
+ workerId,
370
+ runId,
371
+ } ) ;
338
372
339
373
this . #coordinatorSocket. socket . volatile . emit ( "TASK_RUN_HEARTBEAT" , { version : "v1" , runId } ) ;
340
374
} ) ;
341
375
342
376
backgroundWorker . onCreateTaskRunAttempt . attach ( async ( message ) => {
343
- logger . log ( "onCreateTaskRunAttempt()" , { message } ) ;
377
+ logger . log ( "onCreateTaskRunAttempt()" , {
378
+ workerId,
379
+ message,
380
+ } ) ;
344
381
345
382
const createAttempt = await defaultBackoff . execute ( async ( { retry } ) => {
346
383
logger . log ( "Create task run attempt with backoff" , { retry } ) ;
@@ -377,6 +414,7 @@ class ProdWorker {
377
414
378
415
backgroundWorker . attemptCreatedNotification . attach ( ( message ) => {
379
416
logger . log ( "attemptCreatedNotification" , {
417
+ workerId,
380
418
success : message . success ,
381
419
...( message . success
382
420
? {
@@ -398,8 +436,21 @@ class ProdWorker {
398
436
this . attemptFriendlyId = message . execution . attempt . id ;
399
437
} ) ;
400
438
439
+ // MARK: WAIT_FOR_DURATION
401
440
backgroundWorker . onWaitForDuration . attach ( async ( message ) => {
402
- logger . log ( "onWaitForDuration" , { ...message , drift : Date . now ( ) - message . now } ) ;
441
+ logger . log ( "onWaitForDuration" , {
442
+ workerId,
443
+ ...message ,
444
+ drift : Date . now ( ) - message . now ,
445
+ } ) ;
446
+
447
+ if ( this . nextResumeAfter ) {
448
+ logger . error ( "Already waiting for resume, skipping wait for duration" , {
449
+ nextResumeAfter : this . nextResumeAfter ,
450
+ } ) ;
451
+
452
+ return ;
453
+ }
403
454
404
455
noResume: {
405
456
const { ms, waitThresholdInMs } = message ;
@@ -505,15 +556,27 @@ class ProdWorker {
505
556
this . #resumeAfterDuration( ) ;
506
557
} ) ;
507
558
508
- backgroundWorker . onWaitForTask . attach ( this . #waitForTaskHandler . bind ( this ) ) ;
509
- backgroundWorker . onWaitForBatch . attach ( this . #waitForBatchHandler . bind ( this ) ) ;
559
+ backgroundWorker . onWaitForTask . attach ( this . #waitForTaskHandlerFactory ( workerId ) . bind ( this ) ) ;
560
+ backgroundWorker . onWaitForBatch . attach ( this . #waitForBatchHandlerFactory ( workerId ) . bind ( this ) ) ;
510
561
511
562
return backgroundWorker ;
512
563
}
513
564
514
565
async #prepareForWait( reason : WaitReason , willCheckpointAndRestore : boolean ) {
515
566
logger . log ( `prepare for ${ reason } ` , { willCheckpointAndRestore } ) ;
516
567
568
+ if ( this . nextResumeAfter ) {
569
+ logger . error ( "Already waiting for resume, skipping prepare for wait" , {
570
+ nextResumeAfter : this . nextResumeAfter ,
571
+ params : {
572
+ reason,
573
+ willCheckpointAndRestore,
574
+ } ,
575
+ } ) ;
576
+
577
+ return ;
578
+ }
579
+
517
580
if ( ! willCheckpointAndRestore ) {
518
581
return ;
519
582
}
@@ -1169,7 +1232,7 @@ class ProdWorker {
1169
1232
try {
1170
1233
await backoff . wait ( attempt + 1 ) ;
1171
1234
1172
- await this . #waitForTaskHandler ( message ) ;
1235
+ await this . #waitForTaskHandlerFactory ( "replay" ) ( message ) ;
1173
1236
} catch ( error ) {
1174
1237
if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1175
1238
logger . error ( "wait for task replay retry limit exceeded" , { error } ) ;
@@ -1212,7 +1275,7 @@ class ProdWorker {
1212
1275
try {
1213
1276
await backoff . wait ( attempt + 1 ) ;
1214
1277
1215
- await this . #waitForBatchHandler ( message ) ;
1278
+ await this . #waitForBatchHandlerFactory ( "replay" ) ( message ) ;
1216
1279
} catch ( error ) {
1217
1280
if ( error instanceof ExponentialBackoff . RetryLimitExceeded ) {
1218
1281
logger . error ( "wait for batch replay retry limit exceeded" , { error } ) ;
0 commit comments