Skip to content

refactor(NODE-5742): add streaming responses support #3944

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 13, 2023
221 changes: 114 additions & 107 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: (
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined
) => Promise<Document>;
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -217,15 +212,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = promisify(
(
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined,
callback: Callback
) => this.command(ns, cmd, options, callback as any)
);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand Down Expand Up @@ -262,6 +248,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kMessageStream].pipe(this[kStream]);
}

// This whole class is temporary,
// Need to move this to be defined on the prototype for spying.
async commandAsync(ns: MongoDBNamespace, cmd: Document, opt?: CommandOptions) {
return promisify(this.command.bind(this))(ns, cmd, opt);
}

get description(): StreamDescription {
return this[kDescription];
}
Expand Down Expand Up @@ -791,7 +783,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: ModernConnection['command'];
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -831,8 +822,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = this.command.bind(this);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand All @@ -852,6 +841,10 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
this.socket.on('timeout', this.onTimeout.bind(this));
}

async commandAsync(...args: Parameters<typeof this.command>) {
return this.command(...args);
}

/** Indicates that the connection (including underlying TCP socket) has been closed. */
get closed(): boolean {
return this.controller.signal.aborted;
Expand Down Expand Up @@ -1036,62 +1029,68 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
return message;
}

private async sendCommand(
message: WriteProtocolMessageType,
options: CommandOptions
): Promise<Document> {
const { signal } = this.controller;

signal.throwIfAborted();
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}

let response;
try {
await writeCommand(this, message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal
signal: this.controller.signal
});

if (options.noResponse) return { ok: 1 };
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

signal.throwIfAborted();
if (options.noResponse) {
yield { ok: 1 };
return;
}

response = await read(this, { signal });
} finally {
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
if (!signal.aborted) this.controller = new AbortController();
}
this.controller.signal.throwIfAborted();

response.parse(options);
for await (const response of readMany(this, { signal: this.controller.signal })) {
this.socket.setTimeout(0);
response.parse(options);

const [document] = response.documents;
const [document] = response.documents;

if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}
if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}
}
} finally {
this.socket.setTimeout(0);
}

return document;
}

async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
async *sendCommand(ns: MongoDBNamespace, command: Document, options: CommandOptions = {}) {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
Expand All @@ -1103,76 +1102,84 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
);
}

let document = null;
let document;
try {
document = await this.sendCommand(message, options);
} catch (ioError) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, ioError, started)
);
}
throw ioError;
}
this.controller.signal.throwIfAborted();
for await (document of this.sendWire(message, options)) {
if (!Buffer.isBuffer(document) && document.writeConcernError) {
throw new MongoWriteConcernError(document.writeConcernError, document);
}

if (document == null) {
const unexpected = new MongoUnexpectedServerResponseError(
'sendCommand did not throw and did not return a document'
);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, unexpected, started)
);
}
throw unexpected;
}
if (
!Buffer.isBuffer(document) &&
(document.ok === 0 || document.$err || document.errmsg || document.code)
) {
throw new MongoServerError(document);
}

if (document.writeConcernError) {
const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(this as unknown as Connection, message, document, started)
);
}
throw writeConcernError;
}
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
const serverError = new MongoServerError(document);
yield document;
this.controller.signal.throwIfAborted();
}
} catch (error) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, serverError, started)
);
error.name === 'MongoWriteConcernError'
? this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
)
: this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, error, started)
);
}
throw serverError;
throw error;
}
}

if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
this.controller.signal.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {
return document;
}

return document;
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
}

exhaustCommand(
_ns: MongoDBNamespace,
_command: Document,
_options: CommandOptions,
_replyListener: Callback
ns: MongoDBNamespace,
command: Document,
options: CommandOptions,
replyListener: Callback
) {
throw new Error('NODE-5742: not implemented.');
const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
}
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
};
exhaustLoop().catch(replyListener);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { clearTimeout, setTimeout } from 'timers';

import { type Document, Long } from '../bson';
import { connect } from '../cmap/connect';
import { Connection, type ConnectionOptions } from '../cmap/connection';
import type { Connection, ConnectionOptions } from '../cmap/connection';
import { getFAASEnv } from '../cmap/handshake/client_metadata';
import { LEGACY_HELLO_COMMAND } from '../constants';
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
Expand Down Expand Up @@ -132,9 +132,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
useBigInt64: false,
promoteLongs: true,
promoteValues: true,
promoteBuffers: true,
// TODO(NODE-5741): override monitors to use old connection
connectionType: Connection
promoteBuffers: true
};

// ensure no authentication is used for monitoring
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ describe('class RTTPinger', () => {
const rttPingers = await getRTTPingers(client);

for (const rtt of rttPingers) {
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
sinon.stub(rtt.connection, 'commandAsync').rejects(new Error('any error'));
}
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));

Expand Down
37 changes: 22 additions & 15 deletions test/integration/mongodb-handshake/mongodb-handshake.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as sinon from 'sinon';
import {
Connection,
LEGACY_HELLO_COMMAND,
ModernConnection,
MongoServerError,
MongoServerSelectionError,
OpMsgRequest,
Expand All @@ -19,20 +20,24 @@ describe('MongoDB Handshake', () => {

context('when hello is too large', () => {
before(() => {
sinon.stub(Connection.prototype, 'command').callsFake(function (ns, cmd, options, callback) {
// @ts-expect-error: sinon will place wrappedMethod there
const command = Connection.prototype.command.wrappedMethod.bind(this);

if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
return command(
ns,
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
options,
callback
);
}
return command(ns, cmd, options, callback);
});
const connectionType =
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;

sinon
.stub(connectionType.prototype, 'commandAsync')
.callsFake(async function (ns, cmd, options) {
// @ts-expect-error: sinon will place wrappedMethod there
const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this);

if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
return commandAsync(
ns,
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
options
);
}
return commandAsync(ns, cmd, options);
});
});

after(() => sinon.restore());
Expand All @@ -53,7 +58,9 @@ describe('MongoDB Handshake', () => {
let spy: Sinon.SinonSpy;

before(() => {
spy = sinon.spy(Connection.prototype, 'command');
const connectionType =
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;
spy = sinon.spy(connectionType.prototype, 'commandAsync');
});

after(() => sinon.restore());
Expand Down
Loading