Skip to content

Commit edcae0e

Browse files
committed
bring back internal duration timers
1 parent e2c5d11 commit edcae0e

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

.changeset/hungry-sloths-promise.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,5 @@ Fix issues that could result in unreezable state run crashes. Details:
88
- Never checkpoint between attempts
99
- Some messages and socket data now include attempt numbers
1010
- Remove attempt completion replays
11-
- Require external resume message after duration wait
1211
- Additional prod entry point logging
1312
- Fail runs that receive deprecated (pre-lazy attempt) execute messages

packages/cli-v3/src/workers/prod/entry-point.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import { randomUUID } from "node:crypto";
2727
import { readFile } from "node:fs/promises";
2828
import { createServer } from "node:http";
2929
import { setTimeout as timeout } from "node:timers/promises";
30+
import { Evt } from "evt";
3031

3132
declare const __PROJECT_CONFIG__: Config;
3233

@@ -64,6 +65,8 @@ class ProdWorker {
6465
private waitForPostStart = false;
6566
private connectionCount = 0;
6667

68+
private restoreNotification = Evt.create();
69+
6770
private waitForTaskReplay:
6871
| {
6972
idempotencyKey: string;
@@ -500,6 +503,17 @@ class ProdWorker {
500503
// checkpointSafeInternalTimeout is accurate even after non-simulated restores
501504
await Promise.race([internalTimeout, checkpointSafeInternalTimeout]);
502505

506+
const idempotencyKey = randomUUID();
507+
this.durationResumeFallback = { idempotencyKey };
508+
509+
try {
510+
await this.restoreNotification.waitFor(5_000);
511+
} catch (error) {
512+
logger.error("Did not receive restore notification in time", {
513+
error,
514+
});
515+
}
516+
503517
try {
504518
// The coordinator should cancel any in-progress checkpoints so we don't end up with race conditions
505519
const { checkpointCanceled } = await this.#coordinatorSocket.socket
@@ -518,9 +532,6 @@ class ProdWorker {
518532

519533
logger.log("Waiting for external duration resume as we may have been restored");
520534

521-
const idempotencyKey = randomUUID();
522-
this.durationResumeFallback = { idempotencyKey };
523-
524535
setTimeout(() => {
525536
if (!this.durationResumeFallback) {
526537
logger.error("Already resumed after duration, skipping fallback");
@@ -633,6 +644,8 @@ class ProdWorker {
633644
this.nextResumeAfter = undefined;
634645
this.waitForPostStart = false;
635646

647+
this.durationResumeFallback = undefined;
648+
636649
this.#backgroundWorker.waitCompletedNotification();
637650
}
638651

@@ -875,8 +888,6 @@ class ProdWorker {
875888
return;
876889
}
877890

878-
this.durationResumeFallback = undefined;
879-
880891
this.#resumeAfterDuration();
881892
},
882893
EXECUTE_TASK_RUN: async () => {
@@ -1316,6 +1327,7 @@ class ProdWorker {
13161327
}
13171328
case "restore": {
13181329
await this.#reconnectAfterPostStart();
1330+
this.restoreNotification.post();
13191331
break;
13201332
}
13211333
default: {

0 commit comments

Comments
 (0)