Skip to content

Commit 0df7af4

Browse files
authored
Add idle dequeue interval to supervisor (#2007)
* supervisor uses new idle interval after empty dequeues and errors * make supervisor heartbeat interval configurable
1 parent 553a035 commit 0df7af4

File tree

4 files changed

+25
-13
lines changed

4 files changed

+25
-13
lines changed

apps/supervisor/src/env.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";
66
const Env = z.object({
77
// This will come from `spec.nodeName` in k8s
88
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
9+
TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
910

1011
// Required settings
1112
TRIGGER_API_URL: z.string().url(),
@@ -31,7 +32,8 @@ const Env = z.object({
3132

3233
// Dequeue settings (provider mode)
3334
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
34-
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
35+
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
36+
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
3537
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
3638
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),
3739

apps/supervisor/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,12 @@ class ManagedSupervisor {
116116
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
117117
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
118118
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
119+
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
119120
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
120121
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
121122
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
122123
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
124+
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
123125
preDequeue: async () => {
124126
if (this.isKubernetes) {
125127
// Not used in k8s for now

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { PreDequeueFn, PreSkipFn } from "./types.js";
44

55
type RunQueueConsumerOptions = {
66
client: SupervisorHttpClient;
7-
intervalMs?: number;
7+
intervalMs: number;
8+
idleIntervalMs: number;
89
preDequeue?: PreDequeueFn;
910
preSkip?: PreSkipFn;
1011
maxRunCount?: number;
@@ -19,11 +20,13 @@ export class RunQueueConsumer {
1920
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
2021

2122
private intervalMs: number;
23+
private idleIntervalMs: number;
2224
private isEnabled: boolean;
2325

2426
constructor(opts: RunQueueConsumerOptions) {
2527
this.isEnabled = false;
26-
this.intervalMs = opts.intervalMs ?? 5_000;
28+
this.intervalMs = opts.intervalMs;
29+
this.idleIntervalMs = opts.idleIntervalMs;
2730
this.preDequeue = opts.preDequeue;
2831
this.preSkip = opts.preSkip;
2932
this.maxRunCount = opts.maxRunCount;
@@ -84,9 +87,11 @@ export class RunQueueConsumer {
8487
}
8588
}
8689

87-
return this.scheduleNextDequeue();
90+
return this.scheduleNextDequeue(this.idleIntervalMs);
8891
}
8992

93+
let nextIntervalMs = this.idleIntervalMs;
94+
9095
try {
9196
const response = await this.client.dequeue({
9297
maxResources: preDequeueResult?.maxResources,
@@ -98,6 +103,10 @@ export class RunQueueConsumer {
98103
} else {
99104
try {
100105
await this.onDequeue(response.data);
106+
107+
if (response.data.length > 0) {
108+
nextIntervalMs = this.intervalMs;
109+
}
101110
} catch (handlerError) {
102111
console.error("[RunQueueConsumer] onDequeue error", { error: handlerError });
103112
}
@@ -106,10 +115,10 @@ export class RunQueueConsumer {
106115
console.error("[RunQueueConsumer] client.dequeue error", { error: clientError });
107116
}
108117

109-
this.scheduleNextDequeue();
118+
this.scheduleNextDequeue(nextIntervalMs);
110119
}
111120

112-
scheduleNextDequeue(delay: number = this.intervalMs) {
113-
setTimeout(this.dequeue.bind(this), delay);
121+
scheduleNextDequeue(delayMs: number) {
122+
setTimeout(this.dequeue.bind(this), delayMs);
114123
}
115124
}

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ import { IntervalService } from "../../utils/interval.js";
1313
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1414
queueConsumerEnabled?: boolean;
1515
runNotificationsEnabled?: boolean;
16-
heartbeatIntervalSeconds?: number;
17-
dequeueIntervalMs?: number;
16+
heartbeatIntervalSeconds: number;
17+
dequeueIntervalMs: number;
18+
dequeueIdleIntervalMs: number;
1819
preDequeue?: PreDequeueFn;
1920
preSkip?: PreSkipFn;
2021
maxRunCount?: number;
@@ -31,7 +32,6 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
3132
private readonly queueConsumers: RunQueueConsumer[];
3233

3334
private readonly heartbeat: IntervalService;
34-
private readonly heartbeatIntervalSeconds: number;
3535

3636
constructor(private opts: SupervisorSessionOptions) {
3737
super();
@@ -47,12 +47,11 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
4747
preSkip: opts.preSkip,
4848
onDequeue: this.onDequeue.bind(this),
4949
intervalMs: opts.dequeueIntervalMs,
50+
idleIntervalMs: opts.dequeueIdleIntervalMs,
5051
maxRunCount: opts.maxRunCount,
5152
});
5253
});
5354

54-
// TODO: This should be dynamic and set by (or at least overridden by) the platform
55-
this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30;
5655
this.heartbeat = new IntervalService({
5756
onInterval: async () => {
5857
console.debug("[SupervisorSession] Sending heartbeat");
@@ -64,7 +63,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
6463
console.error("[SupervisorSession] Heartbeat failed", { error: response.error });
6564
}
6665
},
67-
intervalMs: this.heartbeatIntervalSeconds * 1000,
66+
intervalMs: opts.heartbeatIntervalSeconds * 1000,
6867
leadingEdge: false,
6968
onError: async (error) => {
7069
console.error("[SupervisorSession] Failed to send heartbeat", { error });

0 commit comments

Comments
 (0)