Skip to content

Commit 0d5bb97

Browse files
committed
drive run heartbeats from child process
1 parent 1f6a283 commit 0d5bb97

File tree

2 files changed

+45
-17
lines changed

2 files changed

+45
-17
lines changed

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
576576
for await (const _ of setInterval(heartbeatInterval)) {
577577
if (_isRunning && _execution) {
578578
try {
579-
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
579+
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.run.id });
580580
} catch (err) {
581581
console.error("Failed to send HEARTBEAT message", err);
582582
}

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { RunLogger, SendDebugLogOptions } from "./logger.js";
1414
import { RunnerEnv } from "./env.js";
1515
import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers";
1616
import { setTimeout as sleep } from "timers/promises";
17-
import { RunExecutionHeartbeat } from "./heartbeat.js";
1817
import { RunExecutionSnapshotPoller } from "./poller.js";
1918
import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils";
2019
import { MetadataClient } from "./overrides.js";
@@ -63,7 +62,6 @@ export class RunExecution {
6362
private restoreCount: number;
6463

6564
private taskRunProcess?: TaskRunProcess;
66-
private runHeartbeat?: RunExecutionHeartbeat;
6765
private snapshotPoller?: RunExecutionSnapshotPoller;
6866

6967
constructor(opts: RunExecutionOptions) {
@@ -105,7 +103,7 @@ export class RunExecution {
105103
envVars: Record<string, string>;
106104
isWarmStart?: boolean;
107105
}) {
108-
return new TaskRunProcess({
106+
const taskRunProcess = new TaskRunProcess({
109107
workerManifest: this.workerManifest,
110108
env: {
111109
...envVars,
@@ -123,6 +121,29 @@ export class RunExecution {
123121
},
124122
isWarmStart,
125123
}).initialize();
124+
125+
taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
126+
if (!this.runFriendlyId) {
127+
this.sendDebugLog("onTaskRunHeartbeat: missing run ID", { heartbeatRunId: runId });
128+
return;
129+
}
130+
131+
if (runId !== this.runFriendlyId) {
132+
this.sendDebugLog("onTaskRunHeartbeat: mismatched run ID", {
133+
heartbeatRunId: runId,
134+
expectedRunId: this.runFriendlyId,
135+
});
136+
return;
137+
}
138+
139+
const [error] = await tryCatch(this.onHeartbeat());
140+
141+
if (error) {
142+
this.sendDebugLog("onTaskRunHeartbeat: failed", { error: error.message });
143+
}
144+
});
145+
146+
return taskRunProcess;
126147
}
127148

128149
/**
@@ -229,7 +250,6 @@ export class RunExecution {
229250
this.currentSnapshotId = snapshot.friendlyId;
230251

231252
// Update services
232-
this.runHeartbeat?.updateSnapshotId(snapshot.friendlyId);
233253
this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId);
234254

235255
switch (snapshot.executionStatus) {
@@ -450,13 +470,6 @@ export class RunExecution {
450470
this.podScheduledAt = runOpts.podScheduledAt;
451471

452472
// Create and start services
453-
this.runHeartbeat = new RunExecutionHeartbeat({
454-
runFriendlyId: this.runFriendlyId,
455-
snapshotFriendlyId: this.currentSnapshotId,
456-
httpClient: this.httpClient,
457-
logger: this.logger,
458-
heartbeatIntervalSeconds: this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS,
459-
});
460473
this.snapshotPoller = new RunExecutionSnapshotPoller({
461474
runFriendlyId: this.runFriendlyId,
462475
snapshotFriendlyId: this.currentSnapshotId,
@@ -466,7 +479,6 @@ export class RunExecution {
466479
handleSnapshotChange: this.handleSnapshotChange.bind(this),
467480
});
468481

469-
this.runHeartbeat.start();
470482
this.snapshotPoller.start();
471483

472484
const [startError, start] = await tryCatch(
@@ -839,9 +851,6 @@ export class RunExecution {
839851
this.env.override(overrides);
840852

841853
// Update services with new values
842-
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
843-
this.runHeartbeat?.updateInterval(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000);
844-
}
845854
if (overrides.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS) {
846855
this.snapshotPoller?.updateInterval(this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS * 1000);
847856
}
@@ -857,6 +866,26 @@ export class RunExecution {
857866
}
858867
}
859868

869+
private async onHeartbeat() {
870+
if (!this.runFriendlyId) {
871+
this.sendDebugLog("Heartbeat: missing run ID");
872+
return;
873+
}
874+
875+
if (!this.currentSnapshotId) {
876+
this.sendDebugLog("Heartbeat: missing snapshot ID");
877+
return;
878+
}
879+
880+
this.sendDebugLog("Heartbeat: started");
881+
882+
const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId);
883+
884+
if (!response.success) {
885+
this.sendDebugLog("Heartbeat: failed", { error: response.error });
886+
}
887+
}
888+
860889
sendDebugLog(
861890
message: string,
862891
properties?: SendDebugLogOptions["properties"],
@@ -917,7 +946,6 @@ export class RunExecution {
917946
}
918947

919948
private stopServices() {
920-
this.runHeartbeat?.stop();
921949
this.snapshotPoller?.stop();
922950
}
923951
}

0 commit comments

Comments
 (0)