Skip to content

Commit b602162

Browse files
refactor(NODE-5696): add async-iterator based socket helpers (#3896)
Co-authored-by: Neal Beeken <[email protected]>
1 parent 89cb092 commit b602162

13 files changed

+1292
-95
lines changed

src/cmap/command_monitoring_events.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
LEGACY_HELLO_COMMAND_CAMEL_CASE
88
} from '../constants';
99
import { calculateDurationInMs, deepCopy } from '../utils';
10-
import { Msg, type Query, type WriteProtocolMessageType } from './commands';
10+
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
1111
import type { Connection } from './connection';
1212

1313
/**
@@ -181,8 +181,8 @@ const HELLO_COMMANDS = new Set(['hello', LEGACY_HELLO_COMMAND, LEGACY_HELLO_COMM
181181

182182
// helper methods
183183
const extractCommandName = (commandDoc: Document) => Object.keys(commandDoc)[0];
184-
const namespace = (command: Query) => command.ns;
185-
const collectionName = (command: Query) => command.ns.split('.')[1];
184+
const namespace = (command: OpQueryRequest) => command.ns;
185+
const collectionName = (command: OpQueryRequest) => command.ns.split('.')[1];
186186
const maybeRedact = (commandName: string, commandDoc: Document, result: Error | Document) =>
187187
SENSITIVE_COMMANDS.has(commandName) ||
188188
(HELLO_COMMANDS.has(commandName) && commandDoc.speculativeAuthenticate)
@@ -220,7 +220,7 @@ const OP_QUERY_KEYS = [
220220

221221
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
222222
function extractCommand(command: WriteProtocolMessageType): Document {
223-
if (command instanceof Msg) {
223+
if (command instanceof OpMsgRequest) {
224224
return deepCopy(command.command);
225225
}
226226

@@ -283,7 +283,7 @@ function extractReply(command: WriteProtocolMessageType, reply?: Document) {
283283
return reply;
284284
}
285285

286-
if (command instanceof Msg) {
286+
if (command instanceof OpMsgRequest) {
287287
return deepCopy(reply.result ? reply.result : reply);
288288
}
289289

src/cmap/commands.ts

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
44
import { ReadPreference } from '../read_preference';
55
import type { ClientSession } from '../sessions';
66
import type { CommandOptions } from './connection';
7-
import { OP_MSG, OP_QUERY } from './wire_protocol/constants';
7+
import {
8+
compress,
9+
Compressor,
10+
type CompressorName,
11+
uncompressibleCommands
12+
} from './wire_protocol/compression';
13+
import { OP_COMPRESSED, OP_MSG, OP_QUERY } from './wire_protocol/constants';
814

915
// Incrementing request id
1016
let _requestId = 0;
@@ -25,7 +31,7 @@ const SHARD_CONFIG_STALE = 4;
2531
const AWAIT_CAPABLE = 8;
2632

2733
/** @internal */
28-
export type WriteProtocolMessageType = Query | Msg;
34+
export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;
2935

3036
/** @internal */
3137
export interface OpQueryOptions extends CommandOptions {
@@ -52,7 +58,7 @@ export interface OpQueryOptions extends CommandOptions {
5258
* QUERY
5359
**************************************************************/
5460
/** @internal */
55-
export class Query {
61+
export class OpQueryRequest {
5662
ns: string;
5763
numberToSkip: number;
5864
numberToReturn: number;
@@ -96,7 +102,7 @@ export class Query {
96102
this.numberToSkip = options.numberToSkip || 0;
97103
this.numberToReturn = options.numberToReturn || 0;
98104
this.returnFieldSelector = options.returnFieldSelector || undefined;
99-
this.requestId = Query.getRequestId();
105+
this.requestId = options.requestId ?? OpQueryRequest.getRequestId();
100106

101107
// special case for pre-3.2 find commands, delete ASAP
102108
this.pre32Limit = options.pre32Limit;
@@ -285,7 +291,7 @@ export interface OpResponseOptions extends BSONSerializeOptions {
285291
}
286292

287293
/** @internal */
288-
export class Response {
294+
export class OpQueryResponse {
289295
parsed: boolean;
290296
raw: Buffer;
291297
data: Buffer;
@@ -472,7 +478,7 @@ export interface OpMsgOptions {
472478
}
473479

474480
/** @internal */
475-
export class Msg {
481+
export class OpMsgRequest {
476482
requestId: number;
477483
serializeFunctions: boolean;
478484
ignoreUndefined: boolean;
@@ -502,7 +508,7 @@ export class Msg {
502508
this.options = options ?? {};
503509

504510
// Additional options
505-
this.requestId = options.requestId ? options.requestId : Msg.getRequestId();
511+
this.requestId = options.requestId ? options.requestId : OpMsgRequest.getRequestId();
506512

507513
// Serialization option
508514
this.serializeFunctions =
@@ -580,7 +586,7 @@ export class Msg {
580586
}
581587

582588
/** @internal */
583-
export class BinMsg {
589+
export class OpMsgResponse {
584590
parsed: boolean;
585591
raw: Buffer;
586592
data: Buffer;
@@ -709,3 +715,54 @@ export class BinMsg {
709715
return { utf8: { writeErrors: false } };
710716
}
711717
}
718+
719+
const MESSAGE_HEADER_SIZE = 16;
720+
const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
721+
722+
/**
723+
* @internal
724+
*
725+
* An OP_COMPRESSED request wraps either an OP_QUERY or OP_MSG message.
726+
*/
727+
export class OpCompressedRequest {
728+
constructor(
729+
private command: WriteProtocolMessageType,
730+
private options: { zlibCompressionLevel: number; agreedCompressor: CompressorName }
731+
) {}
732+
733+
// Return whether a command contains an uncompressible command term
734+
// Will return true if command contains no uncompressible command terms
735+
static canCompress(command: WriteProtocolMessageType) {
736+
const commandDoc = command instanceof OpMsgRequest ? command.command : command.query;
737+
const commandName = Object.keys(commandDoc)[0];
738+
return !uncompressibleCommands.has(commandName);
739+
}
740+
741+
async toBin(): Promise<Buffer[]> {
742+
const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
743+
// otherwise, compress the message
744+
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
745+
746+
// Extract information needed for OP_COMPRESSED from the uncompressed message
747+
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
748+
749+
// Compress the message body
750+
const compressedMessage = await compress(this.options, messageToBeCompressed);
751+
// Create the msgHeader of OP_COMPRESSED
752+
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
753+
msgHeader.writeInt32LE(
754+
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
755+
0
756+
); // messageLength
757+
msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
758+
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
759+
msgHeader.writeInt32LE(OP_COMPRESSED, 12); // opCode
760+
761+
// Create the compression details of OP_COMPRESSED
762+
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
763+
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
764+
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
765+
compressionDetails.writeUInt8(Compressor[this.options.agreedCompressor], 8); // compressorID
766+
return [msgHeader, compressionDetails, compressedMessage];
767+
}
768+
}

0 commit comments

Comments
 (0)