Skip to content

refactor(NODE-6056): implement MongoDBResponse class #4062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export function parseToElementsToArray(bytes: Uint8Array, offset?: number): BSON
const res = BSON.onDemand.parseToElements(bytes, offset);
return Array.isArray(res) ? res : [...res];
}

export const getInt32LE = BSON.onDemand.NumberUtils.getInt32LE;
export const getFloat64LE = BSON.onDemand.NumberUtils.getFloat64LE;
export const getBigInt64LE = BSON.onDemand.NumberUtils.getBigInt64LE;
Expand Down
8 changes: 3 additions & 5 deletions src/cmap/auth/gssapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@ type MechanismProperties = {
async function externalCommand(
connection: Connection,
command: ReturnType<typeof saslStart> | ReturnType<typeof saslContinue>
): Promise<{ payload: string; conversationId: any }> {
return await (connection.command(ns('$external.$cmd'), command, undefined) as Promise<{
payload: string;
conversationId: any;
}>);
): Promise<{ payload: string; conversationId: number }> {
const response = await connection.command(ns('$external.$cmd'), command);
return response as { payload: string; conversationId: number };
}

let krb: Kerberos;
Expand Down
160 changes: 28 additions & 132 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ export type WriteProtocolMessageType = OpQueryRequest | OpMsgRequest;
export interface OpQueryOptions extends CommandOptions {
socketTimeoutMS?: number;
session?: ClientSession;
documentsReturnedIn?: string;
numberToSkip?: number;
numberToReturn?: number;
returnFieldSelector?: Document;
Expand All @@ -53,9 +52,6 @@ export interface OpQueryOptions extends CommandOptions {
exhaustAllowed?: boolean;
}

/**************************************************************
* QUERY
**************************************************************/
/** @internal */
export class OpQueryRequest {
ns: string;
Expand Down Expand Up @@ -284,16 +280,11 @@ export interface MessageHeader {
}

/** @internal */
export interface OpResponseOptions extends BSONSerializeOptions {
documentsReturnedIn?: string | null;
}

/** @internal */
export class OpQueryResponse {
export class OpReply {
parsed: boolean;
raw: Buffer;
data: Buffer;
opts: OpResponseOptions;
opts: BSONSerializeOptions;
length: number;
requestId: number;
responseTo: number;
Expand All @@ -303,7 +294,6 @@ export class OpQueryResponse {
cursorId?: Long;
startingFrom?: number;
numberReturned?: number;
documents: (Document | Buffer)[] = new Array(0);
cursorNotFound?: boolean;
queryFailure?: boolean;
shardConfigStale?: boolean;
Expand All @@ -313,7 +303,8 @@ export class OpQueryResponse {
promoteValues: boolean;
promoteBuffers: boolean;
bsonRegExp?: boolean;
index?: number;
index = 0;
sections: Uint8Array[] = [];

/** moreToCome is an OP_MSG only concept */
moreToCome = false;
Expand All @@ -322,7 +313,7 @@ export class OpQueryResponse {
message: Buffer,
msgHeader: MessageHeader,
msgBody: Buffer,
opts?: OpResponseOptions
opts?: BSONSerializeOptions
) {
this.parsed = false;
this.raw = message;
Expand Down Expand Up @@ -356,29 +347,9 @@ export class OpQueryResponse {
return this.parsed;
}

parse(options: OpResponseOptions): void {
parse(): Uint8Array {
// Don't parse again if not needed
if (this.parsed) return;
options = options ?? {};

// Allow the return of raw documents instead of parsing
const raw = options.raw || false;
const documentsReturnedIn = options.documentsReturnedIn || null;
const useBigInt64 = options.useBigInt64 ?? this.opts.useBigInt64;
const promoteLongs = options.promoteLongs ?? this.opts.promoteLongs;
const promoteValues = options.promoteValues ?? this.opts.promoteValues;
const promoteBuffers = options.promoteBuffers ?? this.opts.promoteBuffers;
const bsonRegExp = options.bsonRegExp ?? this.opts.bsonRegExp;
let bsonSize;

// Set up the options
const _options: BSONSerializeOptions = {
useBigInt64,
promoteLongs,
promoteValues,
promoteBuffers,
bsonRegExp
};
if (this.parsed) return this.sections[0];

// Position within OP_REPLY at which documents start
// (See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#wire-op-reply)
Expand All @@ -390,8 +361,11 @@ export class OpQueryResponse {
this.startingFrom = this.data.readInt32LE(12);
this.numberReturned = this.data.readInt32LE(16);

// Preallocate document array
this.documents = new Array(this.numberReturned);
if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) {
throw new RangeError(
`OP_REPLY numberReturned is an invalid array length ${this.numberReturned}`
);
}

this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) !== 0;
this.queryFailure = (this.responseFlags & QUERY_FAILURE) !== 0;
Expand All @@ -400,67 +374,26 @@ export class OpQueryResponse {

// Parse Body
for (let i = 0; i < this.numberReturned; i++) {
bsonSize =
const bsonSize =
this.data[this.index] |
(this.data[this.index + 1] << 8) |
(this.data[this.index + 2] << 16) |
(this.data[this.index + 3] << 24);

// If we have raw results specified slice the return document
if (raw) {
this.documents[i] = this.data.slice(this.index, this.index + bsonSize);
} else {
this.documents[i] = BSON.deserialize(
this.data.slice(this.index, this.index + bsonSize),
_options
);
}
const section = this.data.subarray(this.index, this.index + bsonSize);
this.sections.push(section);

// Adjust the index
this.index = this.index + bsonSize;
}

if (this.documents.length === 1 && documentsReturnedIn != null && raw) {
const fieldsAsRaw: Document = {};
fieldsAsRaw[documentsReturnedIn] = true;
_options.fieldsAsRaw = fieldsAsRaw;

const doc = BSON.deserialize(this.documents[0] as Buffer, _options);
this.documents = [doc];
}

// Set parsed
this.parsed = true;

return this.sections[0];
}
}

// Implementation of OP_MSG spec:
// https://github.com/mongodb/specifications/blob/master/source/message/OP_MSG.rst
//
// struct Section {
// uint8 payloadType;
// union payload {
// document document; // payloadType == 0
// struct sequence { // payloadType == 1
// int32 size;
// cstring identifier;
// document* documents;
// };
// };
// };

// struct OP_MSG {
// struct MsgHeader {
// int32 messageLength;
// int32 requestID;
// int32 responseTo;
// int32 opCode = 2013;
// };
// uint32 flagBits;
// Section+ sections;
// [uint32 checksum;]
// };

// Msg Flags
const OPTS_CHECKSUM_PRESENT = 1;
const OPTS_MORE_TO_COME = 2;
Expand Down Expand Up @@ -587,7 +520,7 @@ export class OpMsgResponse {
parsed: boolean;
raw: Buffer;
data: Buffer;
opts: OpResponseOptions;
opts: BSONSerializeOptions;
length: number;
requestId: number;
responseTo: number;
Expand All @@ -603,14 +536,14 @@ export class OpMsgResponse {
promoteValues: boolean;
promoteBuffers: boolean;
bsonRegExp: boolean;
documents: (Document | Buffer)[];
index?: number;
index = 0;
sections: Uint8Array[] = [];

constructor(
message: Buffer,
msgHeader: MessageHeader,
msgBody: Buffer,
opts?: OpResponseOptions
opts?: BSONSerializeOptions
) {
this.parsed = false;
this.raw = message;
Expand Down Expand Up @@ -642,47 +575,26 @@ export class OpMsgResponse {
this.promoteBuffers =
typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false;
this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false;

this.documents = [];
}

isParsed(): boolean {
return this.parsed;
}

parse(options: OpResponseOptions): void {
parse(): Uint8Array {
// Don't parse again if not needed
if (this.parsed) return;
options = options ?? {};
if (this.parsed) return this.sections[0];

this.index = 4;
// Allow the return of raw documents instead of parsing
const raw = options.raw || false;
const documentsReturnedIn = options.documentsReturnedIn || null;
const useBigInt64 = options.useBigInt64 ?? this.opts.useBigInt64;
const promoteLongs = options.promoteLongs ?? this.opts.promoteLongs;
const promoteValues = options.promoteValues ?? this.opts.promoteValues;
const promoteBuffers = options.promoteBuffers ?? this.opts.promoteBuffers;
const bsonRegExp = options.bsonRegExp ?? this.opts.bsonRegExp;
const validation = this.parseBsonSerializationOptions(options);

// Set up the options
const bsonOptions: BSONSerializeOptions = {
useBigInt64,
promoteLongs,
promoteValues,
promoteBuffers,
bsonRegExp,
validation
// Due to the strictness of the BSON libraries validation option we need this cast
} as BSONSerializeOptions & { validation: { utf8: { writeErrors: boolean } } };

while (this.index < this.data.length) {
const payloadType = this.data.readUInt8(this.index++);
if (payloadType === 0) {
const bsonSize = this.data.readUInt32LE(this.index);
const bin = this.data.slice(this.index, this.index + bsonSize);
this.documents.push(raw ? bin : BSON.deserialize(bin, bsonOptions));
const bin = this.data.subarray(this.index, this.index + bsonSize);

this.sections.push(bin);

this.index += bsonSize;
} else if (payloadType === 1) {
// It was decided that no driver makes use of payload type 1
Expand All @@ -692,25 +604,9 @@ export class OpMsgResponse {
}
}

if (this.documents.length === 1 && documentsReturnedIn != null && raw) {
const fieldsAsRaw: Document = {};
fieldsAsRaw[documentsReturnedIn] = true;
bsonOptions.fieldsAsRaw = fieldsAsRaw;
const doc = BSON.deserialize(this.documents[0] as Buffer, bsonOptions);
this.documents = [doc];
}

this.parsed = true;
}

parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): {
utf8: { writeErrors: false } | false;
} {
if (enableUtf8Validation === false) {
return { utf8: false };
}

return { utf8: { writeErrors: false } };
return this.sections[0];
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export async function performInitialHandshake(
const handshakeDoc = await prepareHandshakeDocument(authContext);

// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
const handshakeOptions: CommandOptions = { ...options };
const handshakeOptions: CommandOptions = { ...options, raw: false };
if (typeof options.connectTimeoutMS === 'number') {
// The handshake technically is a monitoring check, so its socket timeout should be connectTimeoutMS
handshakeOptions.socketTimeoutMS = options.connectTimeoutMS;
Expand Down
Loading