Skip to content

Commit 9754bed

Browse files
committed
Add a heartbeat for SUSPENDED snapshots, where when stalled will attempt to continue the run if unblocked
1 parent 7602efa commit 9754bed

File tree

7 files changed

+218
-7
lines changed

7 files changed

+218
-7
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,10 @@ const EnvironmentSchema = z.object({
452452
RUN_ENGINE_TIMEOUT_PENDING_CANCEL: z.coerce.number().int().default(60_000),
453453
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
454454
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
455+
RUN_ENGINE_TIMEOUT_SUSPENDED: z.coerce
456+
.number()
457+
.int()
458+
.default(60_000 * 10),
455459
RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false),
456460
RUN_ENGINE_PARENT_QUEUE_LIMIT: z.coerce.number().int().default(1000),
457461
RUN_ENGINE_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ function createRunEngine() {
7676
PENDING_CANCEL: env.RUN_ENGINE_TIMEOUT_PENDING_CANCEL,
7777
EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING,
7878
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
79+
SUSPENDED: env.RUN_ENGINE_TIMEOUT_SUSPENDED,
7980
},
8081
releaseConcurrency: {
8182
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",

internal-packages/run-engine/src/engine/index.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ export class RunEngine {
182182
PENDING_CANCEL: 60_000,
183183
EXECUTING: 60_000,
184184
EXECUTING_WITH_WAITPOINTS: 60_000,
185+
SUSPENDED: 60_000 * 10,
185186
};
186187
this.heartbeatTimeouts = {
187188
...defaultHeartbeatTimeouts,
@@ -1274,9 +1275,29 @@ export class RunEngine {
12741275
break;
12751276
}
12761277
case "SUSPENDED": {
1277-
//todo should we do a periodic check here for whether waitpoints are actually still blocking?
1278-
//we could at least log some things out if a run has been in this state for a long time
1279-
throw new NotImplementedError("Not implemented SUSPENDED");
1278+
const result = await this.waitpointSystem.continueRunIfUnblocked({ runId });
1279+
1280+
this.logger.info("handleStalledSnapshot SUSPENDED continueRunIfUnblocked", {
1281+
runId,
1282+
result,
1283+
snapshotId: latestSnapshot.id,
1284+
});
1285+
1286+
switch (result) {
1287+
case "blocked": {
1288+
// Reschedule the heartbeat
1289+
await this.executionSnapshotSystem.restartHeartbeatForRun({
1290+
runId,
1291+
});
1292+
break;
1293+
}
1294+
case "unblocked":
1295+
case "skipped": {
1296+
break;
1297+
}
1298+
}
1299+
1300+
break;
12801301
}
12811302
case "PENDING_CANCEL": {
12821303
//if the run is waiting to cancel but the worker hasn't confirmed that,

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ export class ExecutionSnapshotSystem {
364364
return executionResultFromSnapshot(latestSnapshot);
365365
}
366366

367-
if (latestSnapshot.workerId !== workerId) {
367+
if (latestSnapshot.workerId && latestSnapshot.workerId !== workerId) {
368368
this.$.logger.debug("heartbeatRun: worker ID does not match the latest snapshot", {
369369
runId,
370370
snapshotId,
@@ -394,6 +394,38 @@ export class ExecutionSnapshotSystem {
394394
return executionResultFromSnapshot(latestSnapshot);
395395
}
396396

397+
public async restartHeartbeatForRun({
398+
runId,
399+
tx,
400+
}: {
401+
runId: string;
402+
tx?: PrismaClientOrTransaction;
403+
}): Promise<ExecutionResult> {
404+
const prisma = tx ?? this.$.prisma;
405+
406+
const latestSnapshot = await getLatestExecutionSnapshot(prisma, runId);
407+
408+
//extending the heartbeat
409+
const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus);
410+
411+
if (intervalMs !== null) {
412+
this.$.logger.debug("restartHeartbeatForRun: enqueuing heartbeat", {
413+
runId,
414+
snapshotId: latestSnapshot.id,
415+
intervalMs,
416+
});
417+
418+
await this.$.worker.enqueue({
419+
id: `heartbeatSnapshot.${runId}`,
420+
job: "heartbeatSnapshot",
421+
payload: { snapshotId: latestSnapshot.id, runId },
422+
availableAt: new Date(Date.now() + intervalMs),
423+
});
424+
}
425+
426+
return executionResultFromSnapshot(latestSnapshot);
427+
}
428+
397429
#getHeartbeatIntervalMs(status: TaskRunExecutionStatus): number | null {
398430
switch (status) {
399431
case "PENDING_EXECUTING": {
@@ -408,6 +440,9 @@ export class ExecutionSnapshotSystem {
408440
case "EXECUTING_WITH_WAITPOINTS": {
409441
return this.heartbeatTimeouts.EXECUTING_WITH_WAITPOINTS;
410442
}
443+
case "SUSPENDED": {
444+
return this.heartbeatTimeouts.SUSPENDED;
445+
}
411446
default: {
412447
return null;
413448
}

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,11 @@ export class WaitpointSystem {
490490
});
491491
}
492492

493-
public async continueRunIfUnblocked({ runId }: { runId: string }) {
493+
public async continueRunIfUnblocked({
494+
runId,
495+
}: {
496+
runId: string;
497+
}): Promise<"blocked" | "unblocked" | "skipped"> {
494498
this.$.logger.debug(`continueRunIfUnblocked: start`, {
495499
runId,
496500
});
@@ -516,7 +520,7 @@ export class WaitpointSystem {
516520
runId,
517521
blockingWaitpoints,
518522
});
519-
return;
523+
return "blocked";
520524
}
521525

522526
// 3. Get the run with environment
@@ -553,7 +557,7 @@ export class WaitpointSystem {
553557
runId,
554558
snapshot,
555559
});
556-
return;
560+
return "skipped";
557561
}
558562

559563
//run is still executing, send a message to the worker
@@ -677,6 +681,8 @@ export class WaitpointSystem {
677681
runId,
678682
});
679683
}
684+
685+
return "unblocked";
680686
}
681687

682688
public async createRunAssociatedWaitpoint(

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,149 @@ describe("RunEngine heartbeats", () => {
479479
}
480480
});
481481

482+
containerTest("Suspended", async ({ prisma, redisOptions }) => {
483+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
484+
485+
const heartbeatTimeout = 1000;
486+
487+
const engine = new RunEngine({
488+
prisma,
489+
worker: {
490+
redis: redisOptions,
491+
workers: 1,
492+
tasksPerWorker: 10,
493+
pollIntervalMs: 100,
494+
},
495+
queue: {
496+
redis: redisOptions,
497+
},
498+
runLock: {
499+
redis: redisOptions,
500+
},
501+
machines: {
502+
defaultMachine: "small-1x",
503+
machines: {
504+
"small-1x": {
505+
name: "small-1x" as const,
506+
cpu: 0.5,
507+
memory: 0.5,
508+
centsPerMs: 0.0001,
509+
},
510+
},
511+
baseCostInCents: 0.0001,
512+
},
513+
heartbeatTimeoutsMs: {
514+
SUSPENDED: heartbeatTimeout,
515+
},
516+
tracer: trace.getTracer("test", "0.0.0"),
517+
});
518+
519+
try {
520+
const taskIdentifier = "test-task";
521+
522+
//create background worker
523+
const backgroundWorker = await setupBackgroundWorker(
524+
engine,
525+
authenticatedEnvironment,
526+
taskIdentifier
527+
);
528+
529+
//trigger the run
530+
const run = await engine.trigger(
531+
{
532+
number: 1,
533+
friendlyId: "run_1234",
534+
environment: authenticatedEnvironment,
535+
taskIdentifier,
536+
payload: "{}",
537+
payloadType: "application/json",
538+
context: {},
539+
traceContext: {},
540+
traceId: "t12345",
541+
spanId: "s12345",
542+
masterQueue: "main",
543+
queue: "task/test-task",
544+
isTest: false,
545+
tags: [],
546+
},
547+
prisma
548+
);
549+
550+
//dequeue the run
551+
const dequeued = await engine.dequeueFromMasterQueue({
552+
consumerId: "test_12345",
553+
masterQueue: run.masterQueue,
554+
maxRunCount: 10,
555+
});
556+
557+
//create an attempt
558+
await engine.startRunAttempt({
559+
runId: dequeued[0].run.id,
560+
snapshotId: dequeued[0].snapshot.id,
561+
});
562+
563+
//cancel run
564+
//create a manual waitpoint
565+
const waitpointResult = await engine.createManualWaitpoint({
566+
environmentId: authenticatedEnvironment.id,
567+
projectId: authenticatedEnvironment.projectId,
568+
});
569+
expect(waitpointResult.waitpoint.status).toBe("PENDING");
570+
571+
//block the run
572+
const blockedResult = await engine.blockRunWithWaitpoint({
573+
runId: run.id,
574+
waitpoints: waitpointResult.waitpoint.id,
575+
projectId: authenticatedEnvironment.projectId,
576+
organizationId: authenticatedEnvironment.organizationId,
577+
});
578+
579+
const blockedExecutionData = await engine.getRunExecutionData({ runId: run.id });
580+
expect(blockedExecutionData?.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
581+
582+
// Create a checkpoint
583+
const checkpointResult = await engine.createCheckpoint({
584+
runId: run.id,
585+
snapshotId: blockedResult.id,
586+
checkpoint: {
587+
type: "DOCKER",
588+
reason: "TEST_CHECKPOINT",
589+
location: "test-location",
590+
imageRef: "test-image-ref",
591+
},
592+
});
593+
594+
expect(checkpointResult.ok).toBe(true);
595+
596+
const snapshot = checkpointResult.ok ? checkpointResult.snapshot : null;
597+
598+
assertNonNullable(snapshot);
599+
600+
// Verify checkpoint creation
601+
expect(snapshot.executionStatus).toBe("SUSPENDED");
602+
603+
// Now wait for the heartbeat to timeout, but it should retry later
604+
await setTimeout(heartbeatTimeout * 1.5);
605+
606+
// Simulate a suspended run without any blocking waitpoints by deleting any blocking task run waitpoints
607+
await prisma.taskRunWaitpoint.deleteMany({
608+
where: {
609+
taskRunId: run.id,
610+
},
611+
});
612+
613+
// Now wait for the heartbeat to timeout again
614+
await setTimeout(heartbeatTimeout * 2);
615+
616+
// Expect the run to be queued
617+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
618+
assertNonNullable(executionData2);
619+
expect(executionData2.snapshot.executionStatus).toBe("QUEUED");
620+
} finally {
621+
await engine.quit();
622+
}
623+
});
624+
482625
containerTest("Heartbeat keeps run alive", async ({ prisma, redisOptions }) => {
483626
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
484627

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export type HeartbeatTimeouts = {
6868
PENDING_CANCEL: number;
6969
EXECUTING: number;
7070
EXECUTING_WITH_WAITPOINTS: number;
71+
SUSPENDED: number;
7172
};
7273

7374
export type TriggerParams = {

0 commit comments

Comments
 (0)