Skip to content

Add idle dequeue interval to supervisor #2007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { AdditionalEnvVars, BoolEnv } from "./envUtil.js";
const Env = z.object({
// This will come from `spec.nodeName` in k8s
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),

// Required settings
TRIGGER_API_URL: z.string().url(),
Expand All @@ -31,7 +32,8 @@ const Env = z.object({

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),

Expand Down
2 changes: 2 additions & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ class ManagedSupervisor {
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
preDequeue: async () => {
if (this.isKubernetes) {
// Not used in k8s for now
Expand Down
21 changes: 15 additions & 6 deletions packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import { PreDequeueFn, PreSkipFn } from "./types.js";

type RunQueueConsumerOptions = {
client: SupervisorHttpClient;
intervalMs?: number;
intervalMs: number;
idleIntervalMs: number;
preDequeue?: PreDequeueFn;
preSkip?: PreSkipFn;
maxRunCount?: number;
Expand All @@ -19,11 +20,13 @@ export class RunQueueConsumer {
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;

private intervalMs: number;
private idleIntervalMs: number;
private isEnabled: boolean;

constructor(opts: RunQueueConsumerOptions) {
this.isEnabled = false;
this.intervalMs = opts.intervalMs ?? 5_000;
this.intervalMs = opts.intervalMs;
this.idleIntervalMs = opts.idleIntervalMs;
this.preDequeue = opts.preDequeue;
this.preSkip = opts.preSkip;
this.maxRunCount = opts.maxRunCount;
Expand Down Expand Up @@ -84,9 +87,11 @@ export class RunQueueConsumer {
}
}

return this.scheduleNextDequeue();
return this.scheduleNextDequeue(this.idleIntervalMs);
}

let nextIntervalMs = this.idleIntervalMs;

try {
const response = await this.client.dequeue({
maxResources: preDequeueResult?.maxResources,
Expand All @@ -98,6 +103,10 @@ export class RunQueueConsumer {
} else {
try {
await this.onDequeue(response.data);

if (response.data.length > 0) {
nextIntervalMs = this.intervalMs;
}
} catch (handlerError) {
console.error("[RunQueueConsumer] onDequeue error", { error: handlerError });
}
Expand All @@ -106,10 +115,10 @@ export class RunQueueConsumer {
console.error("[RunQueueConsumer] client.dequeue error", { error: clientError });
}

this.scheduleNextDequeue();
this.scheduleNextDequeue(nextIntervalMs);
}

scheduleNextDequeue(delay: number = this.intervalMs) {
setTimeout(this.dequeue.bind(this), delay);
scheduleNextDequeue(delayMs: number) {
setTimeout(this.dequeue.bind(this), delayMs);
}
}
11 changes: 5 additions & 6 deletions packages/core/src/v3/runEngineWorker/supervisor/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import { IntervalService } from "../../utils/interval.js";
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
queueConsumerEnabled?: boolean;
runNotificationsEnabled?: boolean;
heartbeatIntervalSeconds?: number;
dequeueIntervalMs?: number;
heartbeatIntervalSeconds: number;
dequeueIntervalMs: number;
dequeueIdleIntervalMs: number;
preDequeue?: PreDequeueFn;
preSkip?: PreSkipFn;
maxRunCount?: number;
Expand All @@ -31,7 +32,6 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
private readonly queueConsumers: RunQueueConsumer[];

private readonly heartbeat: IntervalService;
private readonly heartbeatIntervalSeconds: number;

constructor(private opts: SupervisorSessionOptions) {
super();
Expand All @@ -47,12 +47,11 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
preSkip: opts.preSkip,
onDequeue: this.onDequeue.bind(this),
intervalMs: opts.dequeueIntervalMs,
idleIntervalMs: opts.dequeueIdleIntervalMs,
maxRunCount: opts.maxRunCount,
});
});

// TODO: This should be dynamic and set by (or at least overridden by) the platform
this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30;
this.heartbeat = new IntervalService({
onInterval: async () => {
console.debug("[SupervisorSession] Sending heartbeat");
Expand All @@ -64,7 +63,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
console.error("[SupervisorSession] Heartbeat failed", { error: response.error });
}
},
intervalMs: this.heartbeatIntervalSeconds * 1000,
intervalMs: opts.heartbeatIntervalSeconds * 1000,
leadingEdge: false,
onError: async (error) => {
console.error("[SupervisorSession] Failed to send heartbeat", { error });
Expand Down