Skip to content

fix(NODE-6370) response messages to large commands can be lost under load #4245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,14 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

try {
await this.writeCommand(message, {
const drain = await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel
});

if (options.noResponse || message.moreToCome) {
// if writing multiple messages, make sure we drain
if (drain) await once(this.socket, 'drain');
yield MongoDBResponse.empty;
return;
}
Expand Down Expand Up @@ -618,13 +620,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command.
* This method does not wait until the socket's buffer has emptied but returns true if the
* caller should. Awaiting the `drain` event can result in losts received messages, because
* the 'data' event is not yet handled.
*/
private async writeCommand(
command: WriteProtocolMessageType,
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
): Promise<void> {
): Promise<boolean> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
Expand All @@ -635,8 +639,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

const buffer = Buffer.concat(await finalCommand.toBin());

if (this.socket.write(buffer)) return;
return await once(this.socket, 'drain');
return !this.socket.write(buffer);
}

/**
Expand Down