Skip to content

Commit 722fae5

Browse files
authored
Reschedule heartbeats and added a test to check it works (#1765)
1 parent 983bb41 commit 722fae5

File tree

2 files changed

+133
-30
lines changed

2 files changed

+133
-30
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,8 +2442,11 @@ export class RunEngine {
24422442
},
24432443
});
24442444

2445-
//extending is the same as creating a new heartbeat
2446-
await this.#setHeartbeatDeadline({ runId, snapshotId, status: latestSnapshot.executionStatus });
2445+
//extending the heartbeat
2446+
const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus);
2447+
if (intervalMs !== null) {
2448+
await this.worker.reschedule(`heartbeatSnapshot.${runId}`, new Date(Date.now() + intervalMs));
2449+
}
24472450

24482451
return executionResultFromSnapshot(latestSnapshot);
24492452
}
@@ -3647,11 +3650,15 @@ export class RunEngine {
36473650

36483651
if (!error) {
36493652
//set heartbeat (if relevant)
3650-
await this.#setHeartbeatDeadline({
3651-
status: newSnapshot.executionStatus,
3652-
runId: run.id,
3653-
snapshotId: newSnapshot.id,
3654-
});
3653+
const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus);
3654+
if (intervalMs !== null) {
3655+
await this.worker.enqueue({
3656+
id: `heartbeatSnapshot.${run.id}`,
3657+
job: "heartbeatSnapshot",
3658+
payload: { snapshotId: newSnapshot.id, runId: run.id },
3659+
availableAt: new Date(Date.now() + intervalMs),
3660+
});
3661+
}
36553662
}
36563663

36573664
this.eventBus.emit("executionSnapshotCreated", {
@@ -3695,29 +3702,6 @@ export class RunEngine {
36953702
//#endregion
36963703

36973704
//#region Heartbeat
3698-
async #setHeartbeatDeadline({
3699-
runId,
3700-
snapshotId,
3701-
status,
3702-
}: {
3703-
runId: string;
3704-
snapshotId: string;
3705-
status: TaskRunExecutionStatus;
3706-
}) {
3707-
const intervalMs = this.#getHeartbeatIntervalMs(status);
3708-
3709-
if (intervalMs === null) {
3710-
return;
3711-
}
3712-
3713-
await this.worker.enqueue({
3714-
id: `heartbeatSnapshot.${runId}`,
3715-
job: "heartbeatSnapshot",
3716-
payload: { snapshotId, runId },
3717-
availableAt: new Date(Date.now() + intervalMs),
3718-
});
3719-
}
3720-
37213705
async #handleStalledSnapshot({
37223706
runId,
37233707
snapshotId,

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,123 @@ describe("RunEngine heartbeats", () => {
490490
await engine.quit();
491491
}
492492
});
493+
494+
containerTest(
495+
"Heartbeat keeps run alive",
496+
{ timeout: 15_000 },
497+
async ({ prisma, redisOptions }) => {
498+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
499+
500+
const executingTimeout = 100;
501+
502+
const engine = new RunEngine({
503+
prisma,
504+
worker: {
505+
redis: redisOptions,
506+
workers: 1,
507+
tasksPerWorker: 10,
508+
pollIntervalMs: 100,
509+
},
510+
queue: {
511+
redis: redisOptions,
512+
},
513+
runLock: {
514+
redis: redisOptions,
515+
},
516+
machines: {
517+
defaultMachine: "small-1x",
518+
machines: {
519+
"small-1x": {
520+
name: "small-1x" as const,
521+
cpu: 0.5,
522+
memory: 0.5,
523+
centsPerMs: 0.0001,
524+
},
525+
},
526+
baseCostInCents: 0.0001,
527+
},
528+
heartbeatTimeoutsMs: {
529+
EXECUTING: executingTimeout,
530+
},
531+
tracer: trace.getTracer("test", "0.0.0"),
532+
});
533+
534+
try {
535+
const taskIdentifier = "test-task";
536+
537+
//create background worker
538+
const backgroundWorker = await setupBackgroundWorker(
539+
prisma,
540+
authenticatedEnvironment,
541+
taskIdentifier
542+
);
543+
544+
//trigger the run
545+
const run = await engine.trigger(
546+
{
547+
number: 1,
548+
friendlyId: "run_1234",
549+
environment: authenticatedEnvironment,
550+
taskIdentifier,
551+
payload: "{}",
552+
payloadType: "application/json",
553+
context: {},
554+
traceContext: {},
555+
traceId: "t12345",
556+
spanId: "s12345",
557+
masterQueue: "main",
558+
queueName: "task/test-task",
559+
isTest: false,
560+
tags: [],
561+
},
562+
prisma
563+
);
564+
565+
//dequeue the run
566+
const dequeued = await engine.dequeueFromMasterQueue({
567+
consumerId: "test_12345",
568+
masterQueue: run.masterQueue,
569+
maxRunCount: 10,
570+
});
571+
572+
//create an attempt
573+
const attempt = await engine.startRunAttempt({
574+
runId: dequeued[0].run.id,
575+
snapshotId: dequeued[0].snapshot.id,
576+
});
577+
578+
//should be executing
579+
const executionData = await engine.getRunExecutionData({ runId: run.id });
580+
assertNonNullable(executionData);
581+
expect(executionData.snapshot.executionStatus).toBe("EXECUTING");
582+
expect(executionData.run.status).toBe("EXECUTING");
583+
584+
// Send heartbeats every 50ms (half the timeout)
585+
for (let i = 0; i < 6; i++) {
586+
await setTimeout(50);
587+
await engine.heartbeatRun({
588+
runId: run.id,
589+
snapshotId: attempt.snapshot.id,
590+
});
591+
}
592+
593+
// After 300ms (3x the timeout) the run should still be executing
594+
// because we've been sending heartbeats
595+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
596+
assertNonNullable(executionData2);
597+
expect(executionData2.snapshot.executionStatus).toBe("EXECUTING");
598+
expect(executionData2.run.status).toBe("EXECUTING");
599+
600+
// Stop sending heartbeats and wait for timeout
601+
await setTimeout(executingTimeout * 2);
602+
603+
// Now it should have timed out and be queued
604+
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
605+
assertNonNullable(executionData3);
606+
expect(executionData3.snapshot.executionStatus).toBe("QUEUED");
607+
} finally {
608+
await engine.quit();
609+
}
610+
}
611+
);
493612
});

0 commit comments

Comments
 (0)