Skip to content

Commit f683fe8

Browse files
authored
Multiple queue consumers per supervisor (#1947)
* multiple queue consumer in the same supervisor instance * disable pre * Revert "disable pre" This reverts commit 4b15439.
1 parent 863ecf4 commit f683fe8

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

apps/supervisor/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const Env = z.object({
3333
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
3434
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
3535
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
36+
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),
3637

3738
// Optional services
3839
TRIGGER_WARM_START_URL: z.string().optional(),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class ManagedSupervisor {
118118
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
119119
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
120120
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
121+
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
121122
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
122123
preDequeue: async () => {
123124
if (this.isKubernetes) {

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1818
preDequeue?: PreDequeueFn;
1919
preSkip?: PreSkipFn;
2020
maxRunCount?: number;
21+
maxConsumerCount?: number;
2122
};
2223

2324
export class SupervisorSession extends EventEmitter<WorkerEvents> {
@@ -27,7 +28,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
2728
private runNotificationsSocket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
2829

2930
private readonly queueConsumerEnabled: boolean;
30-
private readonly queueConsumer: RunQueueConsumer;
31+
private readonly queueConsumers: RunQueueConsumer[];
3132

3233
private readonly heartbeat: IntervalService;
3334
private readonly heartbeatIntervalSeconds: number;
@@ -39,13 +40,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
3940
this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true;
4041

4142
this.httpClient = new SupervisorHttpClient(opts);
42-
this.queueConsumer = new RunQueueConsumer({
43-
client: this.httpClient,
44-
preDequeue: opts.preDequeue,
45-
preSkip: opts.preSkip,
46-
onDequeue: this.onDequeue.bind(this),
47-
intervalMs: opts.dequeueIntervalMs,
48-
maxRunCount: opts.maxRunCount,
43+
this.queueConsumers = Array.from({ length: opts.maxConsumerCount ?? 1 }, () => {
44+
return new RunQueueConsumer({
45+
client: this.httpClient,
46+
preDequeue: opts.preDequeue,
47+
preSkip: opts.preSkip,
48+
onDequeue: this.onDequeue.bind(this),
49+
intervalMs: opts.dequeueIntervalMs,
50+
maxRunCount: opts.maxRunCount,
51+
});
4952
});
5053

5154
// TODO: This should be dynamic and set by (or at least overridden by) the platform
@@ -181,7 +184,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
181184

182185
if (this.queueConsumerEnabled) {
183186
console.log("[SupervisorSession] Queue consumer enabled");
184-
this.queueConsumer.start();
187+
await Promise.allSettled(this.queueConsumers.map(async (q) => q.start()));
185188
this.heartbeat.start();
186189
} else {
187190
console.warn("[SupervisorSession] Queue consumer disabled");
@@ -196,6 +199,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
196199
}
197200

198201
async stop() {
202+
await Promise.allSettled(this.queueConsumers.map(async (q) => q.stop()));
199203
this.heartbeat.stop();
200204
this.runNotificationsSocket?.disconnect();
201205
}

0 commit comments

Comments
 (0)