Skip to content

Commit 8ee6b49

Browse files
committed
add more granular service control to supervisor session
1 parent 4e27ced commit 8ee6b49

File tree

1 file changed

+36
-15
lines changed
  • packages/core/src/v3/runEngineWorker/supervisor

1 file changed

+36
-15
lines changed

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { getDefaultWorkerHeaders } from "./util.js";
1111
import { HeartbeatService } from "../../utils/heartbeat.js";
1212

1313
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
14+
queueConsumerEnabled?: boolean;
15+
runNotificationsEnabled?: boolean;
1416
heartbeatIntervalSeconds?: number;
1517
dequeueIntervalMs?: number;
1618
preDequeue?: PreDequeueFn;
@@ -20,15 +22,21 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
2022
export class SupervisorSession extends EventEmitter<WorkerEvents> {
2123
public readonly httpClient: SupervisorHttpClient;
2224

23-
private socket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
25+
private readonly runNotificationsEnabled: boolean;
26+
private runNotificationsSocket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
2427

28+
private readonly queueConsumerEnabled: boolean;
2529
private readonly queueConsumer: RunQueueConsumer;
30+
2631
private readonly heartbeatService: HeartbeatService;
2732
private readonly heartbeatIntervalSeconds: number;
2833

2934
constructor(private opts: SupervisorSessionOptions) {
3035
super();
3136

37+
this.runNotificationsEnabled = opts.runNotificationsEnabled ?? true;
38+
this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true;
39+
3240
this.httpClient = new SupervisorHttpClient(opts);
3341
this.queueConsumer = new RunQueueConsumer({
3442
client: this.httpClient,
@@ -76,12 +84,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
7684
subscribeToRunNotifications(runFriendlyIds: string[]) {
7785
console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds });
7886

79-
if (!this.socket) {
87+
if (!this.runNotificationsSocket) {
8088
console.error("[SupervisorSession] Socket not connected");
8189
return;
8290
}
8391

84-
this.socket.emit("run:subscribe", { version: "1", runFriendlyIds });
92+
this.runNotificationsSocket.emit("run:subscribe", { version: "1", runFriendlyIds });
8593

8694
Promise.allSettled(
8795
runFriendlyIds.map((runFriendlyId) =>
@@ -96,12 +104,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
96104
unsubscribeFromRunNotifications(runFriendlyIds: string[]) {
97105
console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds });
98106

99-
if (!this.socket) {
107+
if (!this.runNotificationsSocket) {
100108
console.error("[SupervisorSession] Socket not connected");
101109
return;
102110
}
103111

104-
this.socket.emit("run:unsubscribe", { version: "1", runFriendlyIds });
112+
this.runNotificationsSocket.emit("run:unsubscribe", { version: "1", runFriendlyIds });
105113

106114
Promise.allSettled(
107115
runFriendlyIds.map((runFriendlyId) =>
@@ -116,15 +124,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
116124
);
117125
}
118126

119-
private createSocket() {
127+
private createRunNotificationsSocket() {
120128
const wsUrl = new URL(this.opts.apiUrl);
121129
wsUrl.pathname = "/worker";
122130

123-
this.socket = io(wsUrl.href, {
131+
const socket = io(wsUrl.href, {
124132
transports: ["websocket"],
125133
extraHeaders: getDefaultWorkerHeaders(this.opts),
126134
});
127-
this.socket.on("run:notify", ({ version, run }) => {
135+
socket.on("run:notify", ({ version, run }) => {
128136
console.log("[SupervisorSession][WS] Received run notification", { version, run });
129137
this.emit("runNotification", { time: new Date(), run });
130138

@@ -137,15 +145,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
137145
console.error("[SupervisorSession] Failed to send debug log", { error });
138146
});
139147
});
140-
this.socket.on("connect", () => {
148+
socket.on("connect", () => {
141149
console.log("[SupervisorSession][WS] Connected to platform");
142150
});
143-
this.socket.on("connect_error", (error) => {
151+
socket.on("connect_error", (error) => {
144152
console.error("[SupervisorSession][WS] Connection error", { error });
145153
});
146-
this.socket.on("disconnect", (reason, description) => {
154+
socket.on("disconnect", (reason, description) => {
147155
console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description });
148156
});
157+
158+
return socket;
149159
}
150160

151161
async start() {
@@ -167,14 +177,25 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
167177
name: workerGroup.name,
168178
});
169179

170-
this.queueConsumer.start();
171-
this.heartbeatService.start();
172-
this.createSocket();
180+
if (this.queueConsumerEnabled) {
181+
console.log("[SupervisorSession] Queue consumer enabled");
182+
this.queueConsumer.start();
183+
this.heartbeatService.start();
184+
} else {
185+
console.warn("[SupervisorSession] Queue consumer disabled");
186+
}
187+
188+
if (this.runNotificationsEnabled) {
189+
console.log("[SupervisorSession] Run notifications enabled");
190+
this.runNotificationsSocket = this.createRunNotificationsSocket();
191+
} else {
192+
console.warn("[SupervisorSession] Run notifications disabled");
193+
}
173194
}
174195

175196
async stop() {
176197
this.heartbeatService.stop();
177-
this.socket?.disconnect();
198+
this.runNotificationsSocket?.disconnect();
178199
}
179200

180201
private getHeartbeatBody(): WorkerApiHeartbeatRequestBody {

0 commit comments

Comments
 (0)