Skip to content

refactor(NODE-5778): change kConnection to connection, split polling and streaming functions, define exhaustCommand #3942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ export class OpQueryResponse {
bsonRegExp?: boolean;
index?: number;

/** moreToCome is an OP_MSG only concept */
moreToCome = false;

constructor(
message: Buffer,
msgHeader: MessageHeader,
Expand Down Expand Up @@ -598,6 +601,7 @@ export class OpMsgResponse {
fromCompressed?: boolean;
responseFlags: number;
checksumPresent: boolean;
/** Indicates the server will be sending more responses on this connection */
moreToCome: boolean;
exhaustAllowed: boolean;
useBigInt64: boolean;
Expand Down
20 changes: 19 additions & 1 deletion src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(err);
}
}

exhaustCommand(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions | undefined,
replyListener: Callback
) {
return this.command(ns, command, options, replyListener);
}
}

/** @internal */
Expand Down Expand Up @@ -1156,6 +1165,15 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {

return document;
}

exhaustCommand(
_ns: MongoDBNamespace,
_command: Document,
_options: CommandOptions,
_replyListener: Callback
) {
throw new Error('NODE-5742: not implemented.');
}
}

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
Expand Down Expand Up @@ -1253,7 +1271,7 @@ export async function* readMany(
const response = await decompressResponse(message);
yield response;

if (!('moreToCome' in response) || !response.moreToCome) {
if (!response.moreToCome) {
return;
}
}
Expand Down
107 changes: 56 additions & 51 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ const kServer = Symbol('server');
/** @internal */
const kMonitorId = Symbol('monitorId');
/** @internal */
const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');
Expand Down Expand Up @@ -94,21 +92,17 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
connectOptions: ConnectionOptions;
isRunningInFaasEnv: boolean;
[kServer]: Server;
[kConnection]?: Connection;
connection: Connection | null;
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
}

constructor(server: Server, options: MonitorOptions) {
super();

this[kServer] = server;
this[kConnection] = undefined;
this.connection = null;
this[kCancellationToken] = new CancellationToken();
this[kCancellationToken].setMaxListeners(Infinity);
this[kMonitorId] = undefined;
Expand Down Expand Up @@ -219,8 +213,8 @@ function resetMonitorState(monitor: Monitor) {

monitor[kCancellationToken].emit('cancel');

monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;
monitor.connection?.destroy({ force: true });
monitor.connection = null;
}

function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion | null): boolean {
Expand All @@ -241,16 +235,17 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion

function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
let awaited: boolean;
const topologyVersion = monitor[kServer].description.topologyVersion;
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error, awaited: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;
function onHeartbeatFailed(err: Error) {
monitor.connection?.destroy({ force: true });
monitor.connection = null;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
Expand All @@ -269,7 +264,39 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
callback(err);
}

const connection = monitor[kConnection];
function onHeartbeatSucceeded(hello: Document) {
if (!('isWritablePrimary' in hello)) {
// Provide hello-style response document.
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop.
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
);
start = now();
} else {
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
}

const { connection } = monitor;
if (connection && !connection.closed) {
const { serverApi, helloOk } = connection;
const connectTimeoutMS = monitor.options.connectTimeoutMS;
Expand Down Expand Up @@ -299,51 +326,29 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
}

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
// Provide hello-style response document.
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const duration =
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);
if (isAwaitable) {
awaited = true;
return connection.exhaustCommand(ns('admin.$cmd'), cmd, options, (error, hello) => {
if (error) return onHeartbeatFailed(error);
return onHeartbeatSucceeded(hello);
});
}

// If we are using the streaming protocol then we immediately issue another 'started'
// event, otherwise the "check" is complete and return to the main monitor loop.
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
);
start = now();
} else {
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
});
awaited = false;
connection
.commandAsync(ns('admin.$cmd'), cmd, options)
.then(onHeartbeatSucceeded, onHeartbeatFailed);

return;
}

// connecting does an implicit `hello`
connect(monitor.connectOptions, (err, conn) => {
if (err) {
monitor[kConnection] = undefined;
monitor.connection = null;

failureHandler(err, false);
awaited = false;
onHeartbeatFailed(err);
return;
}

Expand All @@ -357,7 +362,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
return;
}

monitor[kConnection] = conn;
monitor.connection = conn;
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(
Expand Down