Skip to content

Commit eedc52b

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/supervisor
2 parents 696def2 + cf4c4d7 commit eedc52b

File tree

14 files changed

+264
-51
lines changed

14 files changed

+264
-51
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,9 @@ const EnvironmentSchema = z.object({
571571
/** The max number of runs per API call that we'll dequeue in DEV */
572572
DEV_DEQUEUE_MAX_RUNS_PER_PULL: z.coerce.number().int().default(10),
573573

574+
/** The maximum concurrent local run processes executing at once in dev */
575+
DEV_MAX_CONCURRENT_RUNS: z.coerce.number().int().default(25),
576+
574577
LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
575578
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
576579
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),

apps/webapp/app/routes/engine.v1.dev.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export const loader = createLoaderApiRoute(
2020
environmentId: authentication.environment.id,
2121
dequeueIntervalWithRun: env.DEV_DEQUEUE_INTERVAL_WITH_RUN,
2222
dequeueIntervalWithoutRun: env.DEV_DEQUEUE_INTERVAL_WITHOUT_RUN,
23+
maxConcurrentRuns: env.DEV_MAX_CONCURRENT_RUNS,
2324
});
2425
} catch (error) {
2526
logger.error("Failed to get dev settings", {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ export class RunEngine {
925925

926926
return {
927927
version: "1" as const,
928+
dequeuedAt: new Date(),
928929
snapshot: {
929930
id: newSnapshot.id,
930931
friendlyId: newSnapshot.friendlyId,
@@ -2442,8 +2443,11 @@ export class RunEngine {
24422443
},
24432444
});
24442445

2445-
//extending is the same as creating a new heartbeat
2446-
await this.#setHeartbeatDeadline({ runId, snapshotId, status: latestSnapshot.executionStatus });
2446+
//extending the heartbeat
2447+
const intervalMs = this.#getHeartbeatIntervalMs(latestSnapshot.executionStatus);
2448+
if (intervalMs !== null) {
2449+
await this.worker.reschedule(`heartbeatSnapshot.${runId}`, new Date(Date.now() + intervalMs));
2450+
}
24472451

24482452
return executionResultFromSnapshot(latestSnapshot);
24492453
}
@@ -3647,11 +3651,15 @@ export class RunEngine {
36473651

36483652
if (!error) {
36493653
//set heartbeat (if relevant)
3650-
await this.#setHeartbeatDeadline({
3651-
status: newSnapshot.executionStatus,
3652-
runId: run.id,
3653-
snapshotId: newSnapshot.id,
3654-
});
3654+
const intervalMs = this.#getHeartbeatIntervalMs(newSnapshot.executionStatus);
3655+
if (intervalMs !== null) {
3656+
await this.worker.enqueue({
3657+
id: `heartbeatSnapshot.${run.id}`,
3658+
job: "heartbeatSnapshot",
3659+
payload: { snapshotId: newSnapshot.id, runId: run.id },
3660+
availableAt: new Date(Date.now() + intervalMs),
3661+
});
3662+
}
36553663
}
36563664

36573665
this.eventBus.emit("executionSnapshotCreated", {
@@ -3695,29 +3703,6 @@ export class RunEngine {
36953703
//#endregion
36963704

36973705
//#region Heartbeat
3698-
async #setHeartbeatDeadline({
3699-
runId,
3700-
snapshotId,
3701-
status,
3702-
}: {
3703-
runId: string;
3704-
snapshotId: string;
3705-
status: TaskRunExecutionStatus;
3706-
}) {
3707-
const intervalMs = this.#getHeartbeatIntervalMs(status);
3708-
3709-
if (intervalMs === null) {
3710-
return;
3711-
}
3712-
3713-
await this.worker.enqueue({
3714-
id: `heartbeatSnapshot.${runId}`,
3715-
job: "heartbeatSnapshot",
3716-
payload: { snapshotId, runId },
3717-
availableAt: new Date(Date.now() + intervalMs),
3718-
});
3719-
}
3720-
37213706
async #handleStalledSnapshot({
37223707
runId,
37233708
snapshotId,

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,123 @@ describe("RunEngine heartbeats", () => {
490490
await engine.quit();
491491
}
492492
});
493+
494+
containerTest(
495+
"Heartbeat keeps run alive",
496+
{ timeout: 15_000 },
497+
async ({ prisma, redisOptions }) => {
498+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
499+
500+
const executingTimeout = 100;
501+
502+
const engine = new RunEngine({
503+
prisma,
504+
worker: {
505+
redis: redisOptions,
506+
workers: 1,
507+
tasksPerWorker: 10,
508+
pollIntervalMs: 100,
509+
},
510+
queue: {
511+
redis: redisOptions,
512+
},
513+
runLock: {
514+
redis: redisOptions,
515+
},
516+
machines: {
517+
defaultMachine: "small-1x",
518+
machines: {
519+
"small-1x": {
520+
name: "small-1x" as const,
521+
cpu: 0.5,
522+
memory: 0.5,
523+
centsPerMs: 0.0001,
524+
},
525+
},
526+
baseCostInCents: 0.0001,
527+
},
528+
heartbeatTimeoutsMs: {
529+
EXECUTING: executingTimeout,
530+
},
531+
tracer: trace.getTracer("test", "0.0.0"),
532+
});
533+
534+
try {
535+
const taskIdentifier = "test-task";
536+
537+
//create background worker
538+
const backgroundWorker = await setupBackgroundWorker(
539+
prisma,
540+
authenticatedEnvironment,
541+
taskIdentifier
542+
);
543+
544+
//trigger the run
545+
const run = await engine.trigger(
546+
{
547+
number: 1,
548+
friendlyId: "run_1234",
549+
environment: authenticatedEnvironment,
550+
taskIdentifier,
551+
payload: "{}",
552+
payloadType: "application/json",
553+
context: {},
554+
traceContext: {},
555+
traceId: "t12345",
556+
spanId: "s12345",
557+
masterQueue: "main",
558+
queueName: "task/test-task",
559+
isTest: false,
560+
tags: [],
561+
},
562+
prisma
563+
);
564+
565+
//dequeue the run
566+
const dequeued = await engine.dequeueFromMasterQueue({
567+
consumerId: "test_12345",
568+
masterQueue: run.masterQueue,
569+
maxRunCount: 10,
570+
});
571+
572+
//create an attempt
573+
const attempt = await engine.startRunAttempt({
574+
runId: dequeued[0].run.id,
575+
snapshotId: dequeued[0].snapshot.id,
576+
});
577+
578+
//should be executing
579+
const executionData = await engine.getRunExecutionData({ runId: run.id });
580+
assertNonNullable(executionData);
581+
expect(executionData.snapshot.executionStatus).toBe("EXECUTING");
582+
expect(executionData.run.status).toBe("EXECUTING");
583+
584+
// Send heartbeats every 50ms (half the timeout)
585+
for (let i = 0; i < 6; i++) {
586+
await setTimeout(50);
587+
await engine.heartbeatRun({
588+
runId: run.id,
589+
snapshotId: attempt.snapshot.id,
590+
});
591+
}
592+
593+
// After 300ms (3x the timeout) the run should still be executing
594+
// because we've been sending heartbeats
595+
const executionData2 = await engine.getRunExecutionData({ runId: run.id });
596+
assertNonNullable(executionData2);
597+
expect(executionData2.snapshot.executionStatus).toBe("EXECUTING");
598+
expect(executionData2.run.status).toBe("EXECUTING");
599+
600+
// Stop sending heartbeats and wait for timeout
601+
await setTimeout(executingTimeout * 2);
602+
603+
// Now it should have timed out and be queued
604+
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
605+
assertNonNullable(executionData3);
606+
expect(executionData3.snapshot.executionStatus).toBe("QUEUED");
607+
} finally {
608+
await engine.quit();
609+
}
610+
}
611+
);
493612
});

packages/cli-v3/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
"nypm": "^0.3.9",
111111
"object-hash": "^3.0.0",
112112
"open": "^10.0.3",
113+
"p-limit": "^6.2.0",
113114
"p-retry": "^6.1.0",
114115
"partysocket": "^1.0.2",
115116
"pkg-types": "^1.1.3",

packages/cli-v3/src/commands/dev.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const DevCommandOptions = CommonCommandOptions.extend({
1919
skipUpdateCheck: z.boolean().default(false),
2020
envFile: z.string().optional(),
2121
keepTmpFiles: z.boolean().default(false),
22+
maxConcurrentRuns: z.coerce.number().optional(),
2223
});
2324

2425
export type DevCommandOptions = z.infer<typeof DevCommandOptions>;
@@ -37,6 +38,10 @@ export function configureDevCommand(program: Command) {
3738
"--env-file <env file>",
3839
"Path to the .env file to use for the dev session. Defaults to .env in the project directory."
3940
)
41+
.option(
42+
"--max-concurrent-runs <max concurrent runs>",
43+
"The maximum number of concurrent runs to allow in the dev session"
44+
)
4045
.option("--debug-otel", "Enable OpenTelemetry debugging")
4146
.option("--skip-update-check", "Skip checking for @trigger.dev package updates")
4247
.option(

packages/cli-v3/src/dev/devSupervisor.ts

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
WorkerClientToServerEvents,
2525
WorkerServerToClientEvents,
2626
} from "@trigger.dev/core/v3/workers";
27+
import pLimit from "p-limit";
2728

2829
export type WorkerRuntimeOptions = {
2930
name: string | undefined;
@@ -65,6 +66,8 @@ class DevSupervisor implements WorkerRuntime {
6566

6667
private socketConnections = new Set<string>();
6768

69+
private runLimiter?: ReturnType<typeof pLimit>;
70+
6871
constructor(public readonly options: WorkerRuntimeOptions) {}
6972

7073
async init(): Promise<void> {
@@ -81,6 +84,15 @@ class DevSupervisor implements WorkerRuntime {
8184
logger.debug("[DevSupervisor] Got dev settings", { settings: settings.data });
8285
this.config = settings.data;
8386

87+
const maxConcurrentRuns = Math.min(
88+
this.config.maxConcurrentRuns,
89+
this.options.args.maxConcurrentRuns ?? this.config.maxConcurrentRuns
90+
);
91+
92+
logger.debug("[DevSupervisor] Using maxConcurrentRuns", { maxConcurrentRuns });
93+
94+
this.runLimiter = pLimit(maxConcurrentRuns);
95+
8496
this.#createSocket();
8597

8698
//start an SSE connection for presence
@@ -178,6 +190,14 @@ class DevSupervisor implements WorkerRuntime {
178190
return;
179191
}
180192

193+
if (
194+
this.runLimiter &&
195+
this.runLimiter.activeCount + this.runLimiter.pendingCount > this.runLimiter.concurrency
196+
) {
197+
logger.debug(`[DevSupervisor] dequeueRuns. Run limit reached, trying again later`);
198+
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithoutRun);
199+
}
200+
181201
//get relevant versions
182202
//ignore deprecated and the latest worker
183203
const oldWorkerIds = this.#getActiveOldWorkers();
@@ -287,10 +307,16 @@ class DevSupervisor implements WorkerRuntime {
287307

288308
this.runControllers.set(message.run.friendlyId, runController);
289309

290-
//don't await for run completion, we want to dequeue more runs
291-
runController.start(message).then(() => {
292-
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
293-
});
310+
if (this.runLimiter) {
311+
this.runLimiter(() => runController.start(message)).then(() => {
312+
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
313+
});
314+
} else {
315+
//don't await for run completion, we want to dequeue more runs
316+
runController.start(message).then(() => {
317+
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
318+
});
319+
}
294320
}
295321

296322
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithRun);

0 commit comments

Comments
 (0)