Skip to content

Commit a7e4ddd

Browse files
committed
prep for snapshots since
1 parent b4c61af commit a7e4ddd

File tree

4 files changed

+194
-144
lines changed

4 files changed

+194
-144
lines changed

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 14 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type ManagedRunControllerOptions = {
1818
env: EnvObject;
1919
};
2020

21-
type SupervisorSocket = Socket<WorkloadServerToClientEvents, WorkloadClientToServerEvents>;
21+
export type SupervisorSocket = Socket<WorkloadServerToClientEvents, WorkloadClientToServerEvents>;
2222

2323
export class ManagedRunController {
2424
private readonly env: RunnerEnv;
@@ -31,6 +31,9 @@ export class ManagedRunController {
3131
private warmStartCount = 0;
3232
private restoreCount = 0;
3333

34+
private notificationCount = 0;
35+
private lastNotificationAt: Date | null = null;
36+
3437
private currentExecution: RunExecution | null = null;
3538

3639
constructor(opts: ManagedRunControllerOptions) {
@@ -91,6 +94,8 @@ export class ManagedRunController {
9194
return {
9295
warmStartCount: this.warmStartCount,
9396
restoreCount: this.restoreCount,
97+
notificationCount: this.notificationCount,
98+
lastNotificationAt: this.lastNotificationAt,
9499
};
95100
}
96101

@@ -188,12 +193,16 @@ export class ManagedRunController {
188193
this.currentExecution = null;
189194
}
190195

196+
// Remove all run notification listeners just to be safe
197+
this.socket.removeAllListeners("run:notify");
198+
191199
if (!this.currentExecution || !this.currentExecution.canExecute) {
192200
this.currentExecution = new RunExecution({
193201
workerManifest: this.workerManifest,
194202
env: this.env,
195203
httpClient: this.httpClient,
196204
logger: this.logger,
205+
supervisorSocket: this.socket,
197206
});
198207
}
199208

@@ -224,8 +233,8 @@ export class ManagedRunController {
224233

225234
const metrics = this.currentExecution?.metrics;
226235

227-
if (metrics?.restoreCount) {
228-
this.restoreCount += metrics.restoreCount;
236+
if (metrics?.execution?.restoreCount) {
237+
this.restoreCount += metrics.execution.restoreCount;
229238
}
230239

231240
this.lockedRunExecution = null;
@@ -288,6 +297,7 @@ export class ManagedRunController {
288297
env: this.env,
289298
httpClient: this.httpClient,
290299
logger: this.logger,
300+
supervisorSocket: this.socket,
291301
}).prepareForExecution({
292302
taskRunEnv: previousTaskRunEnv,
293303
});
@@ -392,116 +402,12 @@ export class ManagedRunController {
392402
createSupervisorSocket(): SupervisorSocket {
393403
const wsUrl = new URL("/workload", this.workerApiUrl);
394404

395-
const socket = io(wsUrl.href, {
405+
const socket: SupervisorSocket = io(wsUrl.href, {
396406
transports: ["websocket"],
397407
extraHeaders: {
398408
[WORKLOAD_HEADERS.DEPLOYMENT_ID]: this.env.TRIGGER_DEPLOYMENT_ID,
399409
[WORKLOAD_HEADERS.RUNNER_ID]: this.env.TRIGGER_RUNNER_ID,
400410
},
401-
}) satisfies SupervisorSocket;
402-
403-
socket.on("run:notify", async ({ version, run }) => {
404-
// Generate a unique ID for the notification
405-
const notificationId = Math.random().toString(36).substring(2, 15);
406-
407-
// Use this to track the notification incl. any processing
408-
const notification = {
409-
id: notificationId,
410-
runId: run.friendlyId,
411-
version,
412-
};
413-
414-
// Lock this to the current run and snapshot IDs
415-
const controller = {
416-
runFriendlyId: this.runFriendlyId,
417-
snapshotFriendlyId: this.snapshotFriendlyId,
418-
};
419-
420-
this.sendDebugLog({
421-
runId: run.friendlyId,
422-
message: "run:notify received by runner",
423-
properties: {
424-
notification,
425-
controller,
426-
},
427-
});
428-
429-
if (!controller.runFriendlyId) {
430-
this.sendDebugLog({
431-
runId: run.friendlyId,
432-
message: "run:notify: ignoring notification, no run ID",
433-
properties: {
434-
notification,
435-
controller,
436-
},
437-
});
438-
return;
439-
}
440-
441-
if (!controller.snapshotFriendlyId) {
442-
this.sendDebugLog({
443-
runId: run.friendlyId,
444-
message: "run:notify: ignoring notification, no snapshot ID",
445-
properties: { notification, controller },
446-
});
447-
}
448-
449-
if (run.friendlyId !== controller.runFriendlyId) {
450-
this.sendDebugLog({
451-
runId: run.friendlyId,
452-
message: "run:notify: ignoring notification for different run",
453-
properties: {
454-
notification,
455-
controller,
456-
},
457-
});
458-
return;
459-
}
460-
461-
const latestSnapshot = await this.httpClient.getRunExecutionData(controller.runFriendlyId);
462-
463-
if (!latestSnapshot.success) {
464-
this.sendDebugLog({
465-
runId: run.friendlyId,
466-
message: "run:notify: failed to get latest snapshot data",
467-
properties: {
468-
notification,
469-
controller,
470-
error: latestSnapshot.error,
471-
},
472-
});
473-
return;
474-
}
475-
476-
const runExecutionData = latestSnapshot.data.execution;
477-
478-
if (!this.currentExecution) {
479-
this.sendDebugLog({
480-
runId: run.friendlyId,
481-
message: "run:notify: no current execution",
482-
properties: {
483-
notification,
484-
controller,
485-
},
486-
});
487-
return;
488-
}
489-
490-
const [error] = await tryCatch(
491-
this.currentExecution.enqueueSnapshotChangeAndWait(runExecutionData)
492-
);
493-
494-
if (error) {
495-
this.sendDebugLog({
496-
runId: run.friendlyId,
497-
message: "run:notify: unexpected error",
498-
properties: {
499-
notification,
500-
controller,
501-
error: error.message,
502-
},
503-
});
504-
}
505411
});
506412

507413
socket.on("connect", () => {

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
2020
import { MetadataClient } from "./overrides.js";
2121
import { randomBytes } from "node:crypto";
2222
import { SnapshotManager, SnapshotState } from "./snapshot.js";
23+
import type { SupervisorSocket } from "./controller.js";
24+
import { RunNotifier } from "./notifier.js";
2325

2426
class ExecutionAbortError extends Error {
2527
constructor(message: string) {
@@ -33,6 +35,7 @@ type RunExecutionOptions = {
3335
env: RunnerEnv;
3436
httpClient: WorkloadHttpClient;
3537
logger: RunLogger;
38+
supervisorSocket: SupervisorSocket;
3639
};
3740

3841
type RunExecutionPrepareOptions = {
@@ -71,12 +74,16 @@ export class RunExecution {
7174
private isShuttingDown = false;
7275
private shutdownReason?: string;
7376

77+
private supervisorSocket: SupervisorSocket;
78+
private notifier?: RunNotifier;
79+
7480
constructor(opts: RunExecutionOptions) {
7581
this.id = randomBytes(4).toString("hex");
7682
this.workerManifest = opts.workerManifest;
7783
this.env = opts.env;
7884
this.httpClient = opts.httpClient;
7985
this.logger = opts.logger;
86+
this.supervisorSocket = opts.supervisorSocket;
8087

8188
this.restoreCount = 0;
8289
this.executionAbortController = new AbortController();
@@ -439,10 +446,16 @@ export class RunExecution {
439446
this.snapshotPoller = new RunExecutionSnapshotPoller({
440447
runFriendlyId: this.runFriendlyId,
441448
snapshotFriendlyId: this.snapshotManager.snapshotId,
442-
httpClient: this.httpClient,
443449
logger: this.logger,
444450
snapshotPollIntervalSeconds: this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS,
445-
handleSnapshotChange: this.enqueueSnapshotChangeAndWait.bind(this),
451+
onPoll: this.fetchAndEnqueueSnapshotChange.bind(this),
452+
}).start();
453+
454+
this.notifier = new RunNotifier({
455+
runFriendlyId: this.runFriendlyId,
456+
supervisorSocket: this.supervisorSocket,
457+
onNotify: this.fetchAndEnqueueSnapshotChange.bind(this),
458+
logger: this.logger,
446459
}).start();
447460

448461
const [startError, start] = await tryCatch(
@@ -947,7 +960,12 @@ export class RunExecution {
947960

948961
public get metrics() {
949962
return {
950-
restoreCount: this.restoreCount,
963+
execution: {
964+
restoreCount: this.restoreCount,
965+
lastHeartbeat: this.lastHeartbeat,
966+
},
967+
poller: this.snapshotPoller?.metrics,
968+
notifier: this.notifier?.metrics,
951969
};
952970
}
953971

@@ -981,6 +999,8 @@ export class RunExecution {
981999
this.snapshotPoller?.stop();
9821000
this.snapshotManager?.dispose();
9831001

1002+
this.notifier?.stop();
1003+
9841004
this.taskRunProcess?.unsafeDetachEvtHandlers();
9851005
}
9861006

@@ -1056,4 +1076,40 @@ export class RunExecution {
10561076

10571077
this.sendDebugLog("suspending, any day now 🚬", { suspendableSnapshot });
10581078
}
1079+
1080+
/**
1081+
* Fetches the latest execution data and enqueues snapshot changes. Used by both poller and notification handlers.
1082+
* @param source string - where this call originated (e.g. 'poller', 'notification')
1083+
*/
1084+
public async fetchAndEnqueueSnapshotChange(source: string): Promise<void> {
1085+
if (!this.runFriendlyId) {
1086+
this.sendDebugLog(`fetchAndEnqueueSnapshotChange: missing runFriendlyId`, { source });
1087+
return;
1088+
}
1089+
1090+
const latestSnapshot = await this.httpClient.getRunExecutionData(this.runFriendlyId);
1091+
1092+
if (!latestSnapshot.success) {
1093+
this.sendDebugLog(`fetchAndEnqueueSnapshotChange: failed to get latest snapshot data`, {
1094+
source,
1095+
error: latestSnapshot.error,
1096+
});
1097+
return;
1098+
}
1099+
1100+
const [error] = await tryCatch(
1101+
this.enqueueSnapshotChangeAndWait(latestSnapshot.data.execution)
1102+
);
1103+
1104+
if (error) {
1105+
this.sendDebugLog(
1106+
`fetchAndEnqueueSnapshotChange: failed to enqueue and process snapshot change`,
1107+
{
1108+
source,
1109+
error: error.message,
1110+
}
1111+
);
1112+
return;
1113+
}
1114+
}
10591115
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import type { SupervisorSocket } from "./controller.js";
2+
import type { RunLogger, SendDebugLogOptions } from "./logger.js";
3+
4+
type OnNotify = (source: string) => Promise<void>;
5+
6+
type RunNotifierOptions = {
7+
runFriendlyId: string;
8+
supervisorSocket: SupervisorSocket;
9+
onNotify: OnNotify;
10+
logger: RunLogger;
11+
};
12+
13+
export class RunNotifier {
14+
private runFriendlyId: string;
15+
private socket: SupervisorSocket;
16+
private onNotify: OnNotify;
17+
private logger: RunLogger;
18+
19+
private lastNotificationAt: Date | null = null;
20+
private notificationCount = 0;
21+
22+
private lastInvalidNotificationAt: Date | null = null;
23+
private invalidNotificationCount = 0;
24+
25+
constructor(opts: RunNotifierOptions) {
26+
this.runFriendlyId = opts.runFriendlyId;
27+
this.socket = opts.supervisorSocket;
28+
this.onNotify = opts.onNotify;
29+
this.logger = opts.logger;
30+
}
31+
32+
start(): RunNotifier {
33+
this.sendDebugLog("start");
34+
35+
this.socket.on("run:notify", async ({ version, run }) => {
36+
// Generate a unique ID for the notification
37+
const notificationId = Math.random().toString(36).substring(2, 15);
38+
39+
// Use this to track the notification incl. any processing
40+
const notification = {
41+
id: notificationId,
42+
runId: run.friendlyId,
43+
version,
44+
};
45+
46+
if (run.friendlyId !== this.runFriendlyId) {
47+
this.sendDebugLog("run:notify received invalid notification", { notification });
48+
49+
this.invalidNotificationCount++;
50+
this.lastInvalidNotificationAt = new Date();
51+
52+
return;
53+
}
54+
55+
this.sendDebugLog("run:notify received by runner", { notification });
56+
57+
this.notificationCount++;
58+
this.lastNotificationAt = new Date();
59+
60+
await this.onNotify(`notifier:${notificationId}`);
61+
});
62+
63+
return this;
64+
}
65+
66+
stop() {
67+
this.sendDebugLog("stop");
68+
this.socket.removeAllListeners("run:notify");
69+
}
70+
71+
get metrics() {
72+
return {
73+
lastNotificationAt: this.lastNotificationAt,
74+
notificationCount: this.notificationCount,
75+
lastInvalidNotificationAt: this.lastInvalidNotificationAt,
76+
invalidNotificationCount: this.invalidNotificationCount,
77+
};
78+
}
79+
80+
private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) {
81+
this.logger?.sendDebugLog({
82+
message: `[notifier] ${message}`,
83+
properties: {
84+
...properties,
85+
...this.metrics,
86+
},
87+
});
88+
}
89+
}

0 commit comments

Comments
 (0)