Skip to content

Commit 9126b3e

Browse files
authored
refactor(NODE-5778): change kConnection to connection, split polling and streaming functions, define exhaustCommand (#3942)
1 parent 80999b5 commit 9126b3e

File tree

3 files changed

+79
-52
lines changed

3 files changed

+79
-52
lines changed

src/cmap/commands.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ export class OpQueryResponse {
317317
bsonRegExp?: boolean;
318318
index?: number;
319319

320+
/** moreToCome is an OP_MSG only concept */
321+
moreToCome = false;
322+
320323
constructor(
321324
message: Buffer,
322325
msgHeader: MessageHeader,
@@ -598,6 +601,7 @@ export class OpMsgResponse {
598601
fromCompressed?: boolean;
599602
responseFlags: number;
600603
checksumPresent: boolean;
604+
/** Indicates the server will be sending more responses on this connection */
601605
moreToCome: boolean;
602606
exhaustAllowed: boolean;
603607
useBigInt64: boolean;

src/cmap/connection.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
562562
callback(err);
563563
}
564564
}
565+
566+
exhaustCommand(
567+
ns: MongoDBNamespace,
568+
command: Document,
569+
options: CommandOptions | undefined,
570+
replyListener: Callback
571+
) {
572+
return this.command(ns, command, options, replyListener);
573+
}
565574
}
566575

567576
/** @internal */
@@ -1156,6 +1165,15 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
11561165

11571166
return document;
11581167
}
1168+
1169+
exhaustCommand(
1170+
_ns: MongoDBNamespace,
1171+
_command: Document,
1172+
_options: CommandOptions,
1173+
_replyListener: Callback
1174+
) {
1175+
throw new Error('NODE-5742: not implemented.');
1176+
}
11591177
}
11601178

11611179
const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
@@ -1253,7 +1271,7 @@ export async function* readMany(
12531271
const response = await decompressResponse(message);
12541272
yield response;
12551273

1256-
if (!('moreToCome' in response) || !response.moreToCome) {
1274+
if (!response.moreToCome) {
12571275
return;
12581276
}
12591277
}

src/sdam/monitor.ts

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ const kServer = Symbol('server');
2323
/** @internal */
2424
const kMonitorId = Symbol('monitorId');
2525
/** @internal */
26-
const kConnection = Symbol('connection');
27-
/** @internal */
2826
const kCancellationToken = Symbol('cancellationToken');
2927
/** @internal */
3028
const kRoundTripTime = Symbol('roundTripTime');
@@ -94,21 +92,17 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
9492
connectOptions: ConnectionOptions;
9593
isRunningInFaasEnv: boolean;
9694
[kServer]: Server;
97-
[kConnection]?: Connection;
95+
connection: Connection | null;
9896
[kCancellationToken]: CancellationToken;
9997
/** @internal */
10098
[kMonitorId]?: MonitorInterval;
10199
rttPinger?: RTTPinger;
102100

103-
get connection(): Connection | undefined {
104-
return this[kConnection];
105-
}
106-
107101
constructor(server: Server, options: MonitorOptions) {
108102
super();
109103

110104
this[kServer] = server;
111-
this[kConnection] = undefined;
105+
this.connection = null;
112106
this[kCancellationToken] = new CancellationToken();
113107
this[kCancellationToken].setMaxListeners(Infinity);
114108
this[kMonitorId] = undefined;
@@ -219,8 +213,8 @@ function resetMonitorState(monitor: Monitor) {
219213

220214
monitor[kCancellationToken].emit('cancel');
221215

222-
monitor[kConnection]?.destroy({ force: true });
223-
monitor[kConnection] = undefined;
216+
monitor.connection?.destroy({ force: true });
217+
monitor.connection = null;
224218
}
225219

226220
function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
@@ -241,16 +235,17 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion
241235

242236
function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
243237
let start = now();
238+
let awaited: boolean;
244239
const topologyVersion = monitor[kServer].description.topologyVersion;
245240
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
246241
monitor.emit(
247242
Server.SERVER_HEARTBEAT_STARTED,
248243
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
249244
);
250245

251-
function failureHandler(err: Error, awaited: boolean) {
252-
monitor[kConnection]?.destroy({ force: true });
253-
monitor[kConnection] = undefined;
246+
function onHeartbeatFailed(err: Error) {
247+
monitor.connection?.destroy({ force: true });
248+
monitor.connection = null;
254249

255250
monitor.emit(
256251
Server.SERVER_HEARTBEAT_FAILED,
@@ -269,7 +264,39 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
269264
callback(err);
270265
}
271266

272-
const connection = monitor[kConnection];
267+
function onHeartbeatSucceeded(hello: Document) {
268+
if (!('isWritablePrimary' in hello)) {
269+
// Provide hello-style response document.
270+
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
271+
}
272+
273+
const duration =
274+
isAwaitable && monitor.rttPinger
275+
? monitor.rttPinger.roundTripTime
276+
: calculateDurationInMs(start);
277+
278+
monitor.emit(
279+
Server.SERVER_HEARTBEAT_SUCCEEDED,
280+
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
281+
);
282+
283+
// If we are using the streaming protocol then we immediately issue another 'started'
284+
// event, otherwise the "check" is complete and return to the main monitor loop.
285+
if (isAwaitable) {
286+
monitor.emit(
287+
Server.SERVER_HEARTBEAT_STARTED,
288+
new ServerHeartbeatStartedEvent(monitor.address, true)
289+
);
290+
start = now();
291+
} else {
292+
monitor.rttPinger?.close();
293+
monitor.rttPinger = undefined;
294+
295+
callback(undefined, hello);
296+
}
297+
}
298+
299+
const { connection } = monitor;
273300
if (connection && !connection.closed) {
274301
const { serverApi, helloOk } = connection;
275302
const connectTimeoutMS = monitor.options.connectTimeoutMS;
@@ -299,51 +326,29 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
299326
);
300327
}
301328

302-
connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
303-
if (err) {
304-
return failureHandler(err, isAwaitable);
305-
}
306-
307-
if (!('isWritablePrimary' in hello)) {
308-
// Provide hello-style response document.
309-
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
310-
}
311-
312-
const duration =
313-
isAwaitable && monitor.rttPinger
314-
? monitor.rttPinger.roundTripTime
315-
: calculateDurationInMs(start);
316-
317-
monitor.emit(
318-
Server.SERVER_HEARTBEAT_SUCCEEDED,
319-
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
320-
);
329+
if (isAwaitable) {
330+
awaited = true;
331+
return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => {
332+
if (error) return onHeartbeatFailed(error);
333+
return onHeartbeatSucceeded(hello);
334+
});
335+
}
321336

322-
// If we are using the streaming protocol then we immediately issue another 'started'
323-
// event, otherwise the "check" is complete and return to the main monitor loop.
324-
if (isAwaitable) {
325-
monitor.emit(
326-
Server.SERVER_HEARTBEAT_STARTED,
327-
new ServerHeartbeatStartedEvent(monitor.address, true)
328-
);
329-
start = now();
330-
} else {
331-
monitor.rttPinger?.close();
332-
monitor.rttPinger = undefined;
333-
334-
callback(undefined, hello);
335-
}
336-
});
337+
awaited = false;
338+
connection
339+
.commandAsync(ns('admin.$cmd'), cmd, options)
340+
.then(onHeartbeatSucceeded, onHeartbeatFailed);
337341

338342
return;
339343
}
340344

341345
// connecting does an implicit `hello`
342346
connect(monitor.connectOptions, (err, conn) => {
343347
if (err) {
344-
monitor[kConnection] = undefined;
348+
monitor.connection = null;
345349

346-
failureHandler(err, false);
350+
awaited = false;
351+
onHeartbeatFailed(err);
347352
return;
348353
}
349354

@@ -357,7 +362,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
357362
return;
358363
}
359364

360-
monitor[kConnection] = conn;
365+
monitor.connection = conn;
361366
monitor.emit(
362367
Server.SERVER_HEARTBEAT_SUCCEEDED,
363368
new ServerHeartbeatSucceededEvent(

0 commit comments

Comments
 (0)