Skip to content

Commit 51de7d8

Browse files
committed
refactor(NODE-6057): implement CursorResponse for lazy document parsing
1 parent a6882ec commit 51de7d8

File tree

8 files changed

+170
-27
lines changed

8 files changed

+170
-27
lines changed

src/cmap/wire_protocol/on_demand/document.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,13 @@ export class OnDemandDocument {
197197
}
198198
}
199199

200+
/**
201+
* Returns the number of elements in this BSON document
202+
*/
203+
public size() {
204+
return this.elements.length;
205+
}
206+
200207
/**
201208
* Checks for the existence of an element by name.
202209
*
@@ -303,12 +310,18 @@ export class OnDemandDocument {
303310
});
304311
}
305312

313+
/** Returns this document's bytes only */
314+
toBytes() {
315+
const size = getInt32LE(this.bson, this.offset);
316+
return this.bson.subarray(this.offset, this.offset + size);
317+
}
318+
306319
/**
307320
* Iterates through the elements of a document reviving them using the `as` BSONType.
308321
*
309322
* @param as - The type to revive all elements as
310323
*/
311-
public *valuesAs<const T extends keyof JSTypeOf>(as: T): Generator<JSTypeOf[T]> {
324+
public *valuesAs<const T extends keyof JSTypeOf>(as: T): Generator<JSTypeOf[T], void, void> {
312325
if (!this.isArray) {
313326
throw new BSONError('Unexpected conversion of non-array value to array');
314327
}

src/cmap/wire_protocol/responses.ts

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
1-
import { type BSONSerializeOptions, BSONType, type Document, type Timestamp } from '../../bson';
1+
import {
2+
type BSONSerializeOptions,
3+
BSONType,
4+
type Document,
5+
Long,
6+
type Timestamp
7+
} from '../../bson';
8+
import { MongoUnexpectedServerResponseError } from '../../error';
29
import { type ClusterTime } from '../../sdam/common';
10+
import { type MongoDBNamespace, ns } from '../../utils';
311
import { OnDemandDocument } from './on_demand/document';
412

513
/** @internal */
@@ -107,3 +115,80 @@ export class MongoDBResponse extends OnDemandDocument {
107115
return { utf8: { writeErrors: false } };
108116
}
109117
}
118+
119+
function throwUnsupportedError() {
120+
throw new Error('Unsupported method');
121+
}
122+
123+
export class CursorResponse extends MongoDBResponse {
124+
id: Long | null = null;
125+
ns: MongoDBNamespace | null = null;
126+
127+
documents: any | null = null;
128+
bufferForUnshift: any[] = [];
129+
130+
private batch: OnDemandDocument | null = null;
131+
private values: Generator<OnDemandDocument, void, void> | null = null;
132+
private batchSize = 0;
133+
private iterated = 0;
134+
135+
constructor(b: Uint8Array, o?: number, a?: boolean) {
136+
super(b, o, a);
137+
138+
if (this.isError) return;
139+
140+
const cursor = this.get('cursor', BSONType.object, true);
141+
142+
const id = cursor.get('id', BSONType.long, true);
143+
this.id = new Long(Number(id & 0xffff_ffffn), Number((id >> 32n) & 0xffff_ffffn));
144+
145+
const namespace = cursor.get('ns', BSONType.string) ?? '';
146+
if (namespace) this.ns = ns(namespace);
147+
148+
if (cursor.has('firstBatch')) this.batch = cursor.get('firstBatch', BSONType.array, true);
149+
else if (cursor.has('nextBatch')) this.batch = cursor.get('nextBatch', BSONType.array, true);
150+
else throw new MongoUnexpectedServerResponseError('Cursor document did not contain a batch');
151+
152+
this.values = this.batch.valuesAs(BSONType.object);
153+
this.batchSize = this.batch.size();
154+
this.iterated = 0;
155+
this.documents = Object.defineProperties(Object.create(null), {
156+
length: {
157+
get: () => {
158+
return this.batchSize - this.iterated;
159+
}
160+
},
161+
shift: {
162+
value: (options?: BSONSerializeOptions) => {
163+
this.iterated += 1;
164+
if (this.bufferForUnshift.length) return this.bufferForUnshift.pop();
165+
const r = this.values?.next();
166+
if (!r || r.done) return null;
167+
if (options.raw) {
168+
return r.value.toBytes();
169+
} else {
170+
return r.value.toObject(options);
171+
}
172+
}
173+
},
174+
unshift: {
175+
value: (v: any) => {
176+
this.iterated -= 1;
177+
this.bufferForUnshift.push(v);
178+
}
179+
},
180+
clear: {
181+
value: () => {
182+
this.iterated = this.batchSize;
183+
this.values?.return();
184+
}
185+
},
186+
pushMany: { value: throwUnsupportedError },
187+
push: { value: throwUnsupportedError }
188+
});
189+
}
190+
191+
static isCursorResponse(value: unknown): value is CursorResponse {
192+
return value instanceof CursorResponse;
193+
}
194+
}

src/cursor/abstract_cursor.ts

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Readable, Transform } from 'stream';
22

33
import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
4+
import { CursorResponse } from '../cmap/wire_protocol/responses';
45
import {
56
type AnyError,
67
MongoAPIError,
@@ -144,7 +145,14 @@ export abstract class AbstractCursor<
144145
/** @internal */
145146
[kNamespace]: MongoDBNamespace;
146147
/** @internal */
147-
[kDocuments]: List<TSchema>;
148+
[kDocuments]: {
149+
length: number;
150+
shift(bsonOptions?: any): TSchema | null;
151+
unshift(doc: TSchema): void;
152+
clear(): void;
153+
pushMany(many: Iterable<TSchema>): void;
154+
push(item: TSchema): void;
155+
};
148156
/** @internal */
149157
[kClient]: MongoClient;
150158
/** @internal */
@@ -286,7 +294,7 @@ export abstract class AbstractCursor<
286294
const documentsToRead = Math.min(number ?? this[kDocuments].length, this[kDocuments].length);
287295

288296
for (let count = 0; count < documentsToRead; count++) {
289-
const document = this[kDocuments].shift();
297+
const document = this[kDocuments].shift(this[kOptions]);
290298
if (document != null) {
291299
bufferedDocs.push(document);
292300
}
@@ -633,12 +641,13 @@ export abstract class AbstractCursor<
633641
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;
634642

635643
/** @internal */
636-
async getMore(batchSize: number): Promise<Document | null> {
644+
async getMore(batchSize: number, useCursorResponse = false): Promise<Document | null> {
637645
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
638646
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
639647
...this[kOptions],
640648
session: this[kSession],
641-
batchSize
649+
batchSize,
650+
useCursorResponse
642651
});
643652

644653
return await executeOperation(this[kClient], getMoreOperation);
@@ -656,7 +665,11 @@ export abstract class AbstractCursor<
656665
const state = await this._initialize(this[kSession]);
657666
const response = state.response;
658667
this[kServer] = state.server;
659-
if (response.cursor) {
668+
if (CursorResponse.isCursorResponse(response)) {
669+
this[kId] = response.id;
670+
if (response.ns) this[kNamespace] = response.ns;
671+
this[kDocuments] = response.documents;
672+
} else if (response.cursor) {
660673
// TODO(NODE-2674): Preserve int64 sent from MongoDB
661674
this[kId] =
662675
typeof response.cursor.id === 'number'
@@ -730,7 +743,7 @@ async function next<T>(
730743
}
731744

732745
if (cursor[kDocuments].length !== 0) {
733-
const doc = cursor[kDocuments].shift();
746+
const doc = cursor[kDocuments].shift(cursor[kOptions]);
734747

735748
if (doc != null && transform && cursor[kTransform]) {
736749
try {
@@ -762,8 +775,10 @@ async function next<T>(
762775

763776
try {
764777
const response = await cursor.getMore(batchSize);
765-
766-
if (response) {
778+
if (CursorResponse.isCursorResponse(response)) {
779+
cursor[kId] = response.id;
780+
cursor[kDocuments] = response.documents;
781+
} else if (response) {
767782
const cursorId =
768783
typeof response.cursor.id === 'number'
769784
? Long.fromNumber(response.cursor.id)

src/cursor/find_cursor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
111111
}
112112
}
113113

114-
const response = await super.getMore(batchSize);
114+
const response = await super.getMore(batchSize, true);
115115
// TODO: wrap this in some logic to prevent it from happening if we don't need this support
116116
if (response) {
117-
this[kNumReturned] = this[kNumReturned] + response.cursor.nextBatch.length;
117+
this[kNumReturned] = this[kNumReturned] + response.batchLength;
118118
}
119119

120120
return response;

src/operations/execute_operation.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document } from '../bson';
2+
import { type CursorResponse } from '../cmap/wire_protocol/responses';
23
import {
34
isRetryableReadError,
45
isRetryableWriteError,
@@ -44,7 +45,7 @@ export interface ExecutionResult {
4445
/** The session used for this operation, may be implicitly created */
4546
session?: ClientSession;
4647
/** The raw server response for the operation */
47-
response: Document;
48+
response: Document | CursorResponse;
4849
}
4950

5051
/**

src/operations/find.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document } from '../bson';
2+
import { CursorResponse } from '../cmap/wire_protocol/responses';
23
import type { Collection } from '../collection';
34
import { MongoInvalidArgumentError } from '../error';
45
import { ReadConcern } from '../read_concern';
@@ -111,12 +112,17 @@ export class FindOperation extends CommandOperation<Document> {
111112
findCommand = decorateWithExplain(findCommand, this.explain);
112113
}
113114

114-
return await server.command(this.ns, findCommand, {
115-
...this.options,
116-
...this.bsonOptions,
117-
documentsReturnedIn: 'firstBatch',
118-
session
119-
});
115+
return await server.command(
116+
this.ns,
117+
findCommand,
118+
{
119+
...this.options,
120+
...this.bsonOptions,
121+
documentsReturnedIn: 'firstBatch',
122+
session
123+
},
124+
this.explain ? undefined : CursorResponse
125+
);
120126
}
121127
}
122128

src/operations/get_more.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document, Long } from '../bson';
2+
import { CursorResponse } from '../cmap/wire_protocol/responses';
23
import { MongoRuntimeError } from '../error';
34
import type { Server } from '../sdam/server';
45
import type { ClientSession } from '../sessions';
@@ -19,6 +20,8 @@ export interface GetMoreOptions extends OperationOptions {
1920
maxTimeMS?: number;
2021
/** TODO(NODE-4413): Address bug with maxAwaitTimeMS not being passed in from the cursor correctly */
2122
maxAwaitTimeMS?: number;
23+
24+
useCursorResponse: boolean;
2225
}
2326

2427
/**
@@ -96,7 +99,12 @@ export class GetMoreOperation extends AbstractOperation {
9699
...this.options
97100
};
98101

99-
return await server.command(this.ns, getMoreCmd, commandOptions);
102+
return await server.command(
103+
this.ns,
104+
getMoreCmd,
105+
commandOptions,
106+
this.options.useCursorResponse ? CursorResponse : undefined
107+
);
100108
}
101109
}
102110

src/sdam/server.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
type ConnectionPoolOptions
88
} from '../cmap/connection_pool';
99
import { PoolClearedError } from '../cmap/errors';
10+
import { type MongoDBResponseConstructor } from '../cmap/wire_protocol/responses';
1011
import {
1112
APM_EVENTS,
1213
CLOSED,
@@ -262,11 +263,25 @@ export class Server extends TypedEventEmitter<ServerEvents> {
262263
}
263264
}
264265

265-
/**
266-
* Execute a command
267-
* @internal
268-
*/
269-
async command(ns: MongoDBNamespace, cmd: Document, options: CommandOptions): Promise<Document> {
266+
public async command<T extends MongoDBResponseConstructor>(
267+
ns: MongoDBNamespace,
268+
command: Document,
269+
options: CommandOptions | undefined,
270+
responseType: T | undefined
271+
): Promise<typeof responseType extends undefined ? Document : InstanceType<T>>;
272+
273+
public async command(
274+
ns: MongoDBNamespace,
275+
command: Document,
276+
options?: CommandOptions
277+
): Promise<Document>;
278+
279+
public async command(
280+
ns: MongoDBNamespace,
281+
cmd: Document,
282+
options: CommandOptions,
283+
responseType?: MongoDBResponseConstructor
284+
): Promise<Document> {
270285
if (ns.db == null || typeof ns === 'string') {
271286
throw new MongoInvalidArgumentError('Namespace must not be a string');
272287
}
@@ -308,7 +323,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
308323

309324
try {
310325
try {
311-
return await conn.command(ns, cmd, finalOptions);
326+
return await conn.command(ns, cmd, finalOptions, responseType);
312327
} catch (commandError) {
313328
throw this.decorateCommandError(conn, cmd, finalOptions, commandError);
314329
}
@@ -319,7 +334,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
319334
) {
320335
await this.pool.reauthenticate(conn);
321336
try {
322-
return await conn.command(ns, cmd, finalOptions);
337+
return await conn.command(ns, cmd, finalOptions, responseType);
323338
} catch (commandError) {
324339
throw this.decorateCommandError(conn, cmd, finalOptions, commandError);
325340
}

0 commit comments

Comments
 (0)