Skip to content

Commit 70a2ef9

Browse files
nbbeekendurran
andauthored
refactor(NODE-5742): add streaming responses support (#3944)
Co-authored-by: Durran Jordan <[email protected]>
1 parent b13144a commit 70a2ef9

File tree

6 files changed

+175
-165
lines changed

6 files changed

+175
-165
lines changed

src/cmap/connection.ts

Lines changed: 114 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
170170
lastHelloMS?: number;
171171
serverApi?: ServerApi;
172172
helloOk?: boolean;
173-
commandAsync: (
174-
ns: MongoDBNamespace,
175-
cmd: Document,
176-
options: CommandOptions | undefined
177-
) => Promise<Document>;
178173
/** @internal */
179174
authContext?: AuthContext;
180175

@@ -217,15 +212,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
217212
constructor(stream: Stream, options: ConnectionOptions) {
218213
super();
219214

220-
this.commandAsync = promisify(
221-
(
222-
ns: MongoDBNamespace,
223-
cmd: Document,
224-
options: CommandOptions | undefined,
225-
callback: Callback
226-
) => this.command(ns, cmd, options, callback as any)
227-
);
228-
229215
this.id = options.id;
230216
this.address = streamIdentifier(stream, options);
231217
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -262,6 +248,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
262248
this[kMessageStream].pipe(this[kStream]);
263249
}
264250

251+
// This whole class is temporary,
252+
// Need to move this to be defined on the prototype for spying.
253+
async commandAsync(ns: MongoDBNamespace, cmd: Document, opt?: CommandOptions) {
254+
return promisify(this.command.bind(this))(ns, cmd, opt);
255+
}
256+
265257
get description(): StreamDescription {
266258
return this[kDescription];
267259
}
@@ -791,7 +783,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
791783
lastHelloMS?: number;
792784
serverApi?: ServerApi;
793785
helloOk?: boolean;
794-
commandAsync: ModernConnection['command'];
795786
/** @internal */
796787
authContext?: AuthContext;
797788

@@ -831,8 +822,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
831822
constructor(stream: Stream, options: ConnectionOptions) {
832823
super();
833824

834-
this.commandAsync = this.command.bind(this);
835-
836825
this.id = options.id;
837826
this.address = streamIdentifier(stream, options);
838827
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
@@ -852,6 +841,10 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
852841
this.socket.on('timeout', this.onTimeout.bind(this));
853842
}
854843

844+
async commandAsync(...args: Parameters<typeof this.command>) {
845+
return this.command(...args);
846+
}
847+
855848
/** Indicates that the connection (including underlying TCP socket) has been closed. */
856849
get closed(): boolean {
857850
return this.controller.signal.aborted;
@@ -1036,62 +1029,68 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
10361029
return message;
10371030
}
10381031

1039-
private async sendCommand(
1040-
message: WriteProtocolMessageType,
1041-
options: CommandOptions
1042-
): Promise<Document> {
1043-
const { signal } = this.controller;
1044-
1045-
signal.throwIfAborted();
1032+
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
1033+
this.controller.signal.throwIfAborted();
10461034

10471035
if (typeof options.socketTimeoutMS === 'number') {
10481036
this.socket.setTimeout(options.socketTimeoutMS);
10491037
} else if (this.socketTimeoutMS !== 0) {
10501038
this.socket.setTimeout(this.socketTimeoutMS);
10511039
}
10521040

1053-
let response;
10541041
try {
10551042
await writeCommand(this, message, {
10561043
agreedCompressor: this.description.compressor ?? 'none',
10571044
zlibCompressionLevel: this.description.zlibCompressionLevel,
1058-
signal
1045+
signal: this.controller.signal
10591046
});
10601047

1061-
if (options.noResponse) return { ok: 1 };
1048+
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1049+
this.controller = new AbortController();
10621050

1063-
signal.throwIfAborted();
1051+
if (options.noResponse) {
1052+
yield { ok: 1 };
1053+
return;
1054+
}
10641055

1065-
response = await read(this, { signal });
1066-
} finally {
1067-
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1068-
if (!signal.aborted) this.controller = new AbortController();
1069-
}
1056+
this.controller.signal.throwIfAborted();
10701057

1071-
response.parse(options);
1058+
for await (const response of readMany(this, { signal: this.controller.signal })) {
1059+
this.socket.setTimeout(0);
1060+
response.parse(options);
10721061

1073-
const [document] = response.documents;
1062+
const [document] = response.documents;
10741063

1075-
if (!Buffer.isBuffer(document)) {
1076-
const { session } = options;
1077-
if (session) {
1078-
updateSessionFromResponse(session, document);
1079-
}
1064+
if (!Buffer.isBuffer(document)) {
1065+
const { session } = options;
1066+
if (session) {
1067+
updateSessionFromResponse(session, document);
1068+
}
10801069

1081-
if (document.$clusterTime) {
1082-
this[kClusterTime] = document.$clusterTime;
1083-
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
1070+
if (document.$clusterTime) {
1071+
this[kClusterTime] = document.$clusterTime;
1072+
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
1073+
}
1074+
}
1075+
1076+
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
1077+
this.controller = new AbortController();
1078+
1079+
yield document;
1080+
this.controller.signal.throwIfAborted();
1081+
1082+
if (typeof options.socketTimeoutMS === 'number') {
1083+
this.socket.setTimeout(options.socketTimeoutMS);
1084+
} else if (this.socketTimeoutMS !== 0) {
1085+
this.socket.setTimeout(this.socketTimeoutMS);
1086+
}
10841087
}
1088+
} finally {
1089+
this.socket.setTimeout(0);
10851090
}
1086-
1087-
return document;
10881091
}
10891092

1090-
async command(
1091-
ns: MongoDBNamespace,
1092-
command: Document,
1093-
options: CommandOptions = {}
1094-
): Promise<Document> {
1093+
async *sendCommand(ns: MongoDBNamespace, command: Document, options: CommandOptions = {}) {
10951094
const message = this.prepareCommand(ns.db, command, options);
10961095

10971096
let started = 0;
@@ -1103,76 +1102,84 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
11031102
);
11041103
}
11051104

1106-
let document = null;
1105+
let document;
11071106
try {
1108-
document = await this.sendCommand(message, options);
1109-
} catch (ioError) {
1110-
if (this.monitorCommands) {
1111-
this.emit(
1112-
ModernConnection.COMMAND_FAILED,
1113-
new CommandFailedEvent(this as unknown as Connection, message, ioError, started)
1114-
);
1115-
}
1116-
throw ioError;
1117-
}
1107+
this.controller.signal.throwIfAborted();
1108+
for await (document of this.sendWire(message, options)) {
1109+
if (!Buffer.isBuffer(document) && document.writeConcernError) {
1110+
throw new MongoWriteConcernError(document.writeConcernError, document);
1111+
}
11181112

1119-
if (document == null) {
1120-
const unexpected = new MongoUnexpectedServerResponseError(
1121-
'sendCommand did not throw and did not return a document'
1122-
);
1123-
if (this.monitorCommands) {
1124-
this.emit(
1125-
ModernConnection.COMMAND_FAILED,
1126-
new CommandFailedEvent(this as unknown as Connection, message, unexpected, started)
1127-
);
1128-
}
1129-
throw unexpected;
1130-
}
1113+
if (
1114+
!Buffer.isBuffer(document) &&
1115+
(document.ok === 0 || document.$err || document.errmsg || document.code)
1116+
) {
1117+
throw new MongoServerError(document);
1118+
}
11311119

1132-
if (document.writeConcernError) {
1133-
const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document);
1134-
if (this.monitorCommands) {
1135-
this.emit(
1136-
ModernConnection.COMMAND_SUCCEEDED,
1137-
new CommandSucceededEvent(this as unknown as Connection, message, document, started)
1138-
);
1139-
}
1140-
throw writeConcernError;
1141-
}
1120+
if (this.monitorCommands) {
1121+
this.emit(
1122+
ModernConnection.COMMAND_SUCCEEDED,
1123+
new CommandSucceededEvent(
1124+
this as unknown as Connection,
1125+
message,
1126+
options.noResponse ? undefined : document,
1127+
started
1128+
)
1129+
);
1130+
}
11421131

1143-
if (document.ok === 0 || document.$err || document.errmsg || document.code) {
1144-
const serverError = new MongoServerError(document);
1132+
yield document;
1133+
this.controller.signal.throwIfAborted();
1134+
}
1135+
} catch (error) {
11451136
if (this.monitorCommands) {
1146-
this.emit(
1147-
ModernConnection.COMMAND_FAILED,
1148-
new CommandFailedEvent(this as unknown as Connection, message, serverError, started)
1149-
);
1137+
error.name === 'MongoWriteConcernError'
1138+
? this.emit(
1139+
ModernConnection.COMMAND_SUCCEEDED,
1140+
new CommandSucceededEvent(
1141+
this as unknown as Connection,
1142+
message,
1143+
options.noResponse ? undefined : document,
1144+
started
1145+
)
1146+
)
1147+
: this.emit(
1148+
ModernConnection.COMMAND_FAILED,
1149+
new CommandFailedEvent(this as unknown as Connection, message, error, started)
1150+
);
11501151
}
1151-
throw serverError;
1152+
throw error;
11521153
}
1154+
}
11531155

1154-
if (this.monitorCommands) {
1155-
this.emit(
1156-
ModernConnection.COMMAND_SUCCEEDED,
1157-
new CommandSucceededEvent(
1158-
this as unknown as Connection,
1159-
message,
1160-
options.noResponse ? undefined : document,
1161-
started
1162-
)
1163-
);
1156+
async command(
1157+
ns: MongoDBNamespace,
1158+
command: Document,
1159+
options: CommandOptions = {}
1160+
): Promise<Document> {
1161+
this.controller.signal.throwIfAborted();
1162+
for await (const document of this.sendCommand(ns, command, options)) {
1163+
return document;
11641164
}
1165-
1166-
return document;
1165+
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
11671166
}
11681167

11691168
exhaustCommand(
1170-
_ns: MongoDBNamespace,
1171-
_command: Document,
1172-
_options: CommandOptions,
1173-
_replyListener: Callback
1169+
ns: MongoDBNamespace,
1170+
command: Document,
1171+
options: CommandOptions,
1172+
replyListener: Callback
11741173
) {
1175-
throw new Error('NODE-5742: not implemented.');
1174+
const exhaustLoop = async () => {
1175+
this.controller.signal.throwIfAborted();
1176+
for await (const reply of this.sendCommand(ns, command, options)) {
1177+
replyListener(undefined, reply);
1178+
this.controller.signal.throwIfAborted();
1179+
}
1180+
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
1181+
};
1182+
exhaustLoop().catch(replyListener);
11761183
}
11771184
}
11781185

src/sdam/monitor.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { clearTimeout, setTimeout } from 'timers';
22

33
import { type Document, Long } from '../bson';
44
import { connect } from '../cmap/connect';
5-
import { Connection, type ConnectionOptions } from '../cmap/connection';
5+
import type { Connection, ConnectionOptions } from '../cmap/connection';
66
import { getFAASEnv } from '../cmap/handshake/client_metadata';
77
import { LEGACY_HELLO_COMMAND } from '../constants';
88
import { MongoError, MongoErrorLabel, MongoNetworkTimeoutError } from '../error';
@@ -132,9 +132,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
132132
useBigInt64: false,
133133
promoteLongs: true,
134134
promoteValues: true,
135-
promoteBuffers: true,
136-
// TODO(NODE-5741): override monitors to use old connection
137-
connectionType: Connection
135+
promoteBuffers: true
138136
};
139137

140138
// ensure no authentication is used for monitoring

test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ describe('class RTTPinger', () => {
164164
const rttPingers = await getRTTPingers(client);
165165

166166
for (const rtt of rttPingers) {
167-
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
167+
sinon.stub(rtt.connection, 'commandAsync').rejects(new Error('any error'));
168168
}
169169
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));
170170

test/integration/mongodb-handshake/mongodb-handshake.test.ts

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import * as sinon from 'sinon';
55
import {
66
Connection,
77
LEGACY_HELLO_COMMAND,
8+
ModernConnection,
89
MongoServerError,
910
MongoServerSelectionError,
1011
OpMsgRequest,
@@ -19,20 +20,24 @@ describe('MongoDB Handshake', () => {
1920

2021
context('when hello is too large', () => {
2122
before(() => {
22-
sinon.stub(Connection.prototype, 'command').callsFake(function (ns, cmd, options, callback) {
23-
// @ts-expect-error: sinon will place wrappedMethod there
24-
const command = Connection.prototype.command.wrappedMethod.bind(this);
25-
26-
if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
27-
return command(
28-
ns,
29-
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
30-
options,
31-
callback
32-
);
33-
}
34-
return command(ns, cmd, options, callback);
35-
});
23+
const connectionType =
24+
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;
25+
26+
sinon
27+
.stub(connectionType.prototype, 'commandAsync')
28+
.callsFake(async function (ns, cmd, options) {
29+
// @ts-expect-error: sinon will place wrappedMethod there
30+
const commandAsync = connectionType.prototype.commandAsync.wrappedMethod.bind(this);
31+
32+
if (cmd.hello || cmd[LEGACY_HELLO_COMMAND]) {
33+
return commandAsync(
34+
ns,
35+
{ ...cmd, client: { driver: { name: 'a'.repeat(1000) } } },
36+
options
37+
);
38+
}
39+
return commandAsync(ns, cmd, options);
40+
});
3641
});
3742

3843
after(() => sinon.restore());
@@ -53,7 +58,9 @@ describe('MongoDB Handshake', () => {
5358
let spy: Sinon.SinonSpy;
5459

5560
before(() => {
56-
spy = sinon.spy(Connection.prototype, 'command');
61+
const connectionType =
62+
process.env.MONGODB_NEW_CONNECTION === 'true' ? ModernConnection : Connection;
63+
spy = sinon.spy(connectionType.prototype, 'commandAsync');
5764
});
5865

5966
after(() => sinon.restore());

0 commit comments

Comments
 (0)