Skip to content

Commit 4d30352

Browse files
committed
feat: introduce AbstractCursor and its concrete subclasses
This change introduces a fundamental redesign of the cursor types in the driver. The first change is to add a new `AbstractCursor` type, which is only concerned with iterating a cursor (using `getMore`) once it has been initialized. The `_initialize` method must be implemented by subclasses. The concrete subclasses are generally builders for `find` and `aggregate` commands, each providing their own custom initialization method. NODE-2809
1 parent cf5c865 commit 4d30352

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2004
-2894
lines changed

package-lock.json

Lines changed: 12 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@
2626
"bson-ext": "^2.0.0"
2727
},
2828
"dependencies": {
29+
"@types/lodash": "^4.14.164",
2930
"bl": "^2.2.1",
3031
"bson": "^4.0.4",
31-
"denque": "^1.4.1"
32+
"denque": "^1.4.1",
33+
"lodash": "^4.17.20"
3234
},
3335
"devDependencies": {
3436
"@istanbuljs/nyc-config-typescript": "^1.0.1",

src/change_stream.ts

Lines changed: 70 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import Denque = require('denque');
22
import { EventEmitter } from 'events';
33
import { MongoError, AnyError, isResumableError } from './error';
4-
import { Cursor, CursorOptions, CursorStream, CursorStreamOptions } from './cursor/cursor';
54
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
65
import {
76
relayEvents,
@@ -21,6 +20,14 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
2120
import { MongoClient } from './mongo_client';
2221
import { Db } from './db';
2322
import { Collection } from './collection';
23+
import type { Readable } from 'stream';
24+
import {
25+
AbstractCursor,
26+
AbstractCursorOptions,
27+
CursorStreamOptions
28+
} from './cursor/abstract_cursor';
29+
import type { ClientSession } from './sessions';
30+
import { executeOperation, ExecutionResult } from './operations/execute_operation';
2431

2532
const kResumeQueue = Symbol('resumeQueue');
2633
const kCursorStream = Symbol('cursorStream');
@@ -162,13 +169,6 @@ interface UpdateDescription {
162169
removedFields: string[];
163170
}
164171

165-
/** @internal */
166-
export class ChangeStreamStream extends CursorStream {
167-
constructor(cursor: ChangeStreamCursor) {
168-
super(cursor);
169-
}
170-
}
171-
172172
/**
173173
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
174174
* @public
@@ -183,7 +183,7 @@ export class ChangeStream extends EventEmitter {
183183
closed: boolean;
184184
streamOptions?: CursorStreamOptions;
185185
[kResumeQueue]: Denque;
186-
[kCursorStream]?: CursorStream;
186+
[kCursorStream]?: Readable;
187187

188188
/** @event */
189189
static readonly CLOSE = 'close' as const;
@@ -252,13 +252,13 @@ export class ChangeStream extends EventEmitter {
252252

253253
this.on('removeListener', eventName => {
254254
if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
255-
this[kCursorStream]?.removeAllListeners(CursorStream.DATA);
255+
this[kCursorStream]?.removeAllListeners('data');
256256
}
257257
});
258258
}
259259

260260
/** @internal */
261-
get cursorStream(): CursorStream | undefined {
261+
get cursorStream(): Readable | undefined {
262262
return this[kCursorStream];
263263
}
264264

@@ -325,7 +325,7 @@ export class ChangeStream extends EventEmitter {
325325
* Return a modified Readable stream including a possible transform method.
326326
* @throws MongoError if this.cursor is undefined
327327
*/
328-
stream(options?: CursorStreamOptions): ChangeStreamStream {
328+
stream(options?: CursorStreamOptions): Readable {
329329
this.streamOptions = options;
330330
if (!this.cursor) {
331331
throw new MongoError('ChangeStream has no cursor, unable to stream');
@@ -335,28 +335,34 @@ export class ChangeStream extends EventEmitter {
335335
}
336336

337337
/** @public */
338-
export interface ChangeStreamCursorOptions extends CursorOptions {
338+
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
339339
startAtOperationTime?: OperationTime;
340340
resumeAfter?: ResumeToken;
341341
startAfter?: boolean;
342342
}
343343

344344
/** @internal */
345-
export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamCursorOptions> {
345+
export class ChangeStreamCursor extends AbstractCursor {
346346
_resumeToken: ResumeToken;
347347
startAtOperationTime?: OperationTime;
348348
hasReceived?: boolean;
349349
resumeAfter: ResumeToken;
350350
startAfter: ResumeToken;
351+
options: ChangeStreamCursorOptions;
352+
353+
postBatchResumeToken?: Document;
354+
pipeline: Document[];
351355

352356
constructor(
353357
topology: Topology,
354-
operation: AggregateOperation,
355-
options: ChangeStreamCursorOptions
358+
namespace: MongoDBNamespace,
359+
pipeline: Document[] = [],
360+
options: ChangeStreamCursorOptions = {}
356361
) {
357-
super(topology, operation, options);
362+
super(topology, namespace, options);
358363

359-
options = options || {};
364+
this.pipeline = pipeline;
365+
this.options = options;
360366
this._resumeToken = null;
361367
this.startAtOperationTime = options.startAtOperationTime;
362368

@@ -421,18 +427,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
421427
}
422428
}
423429

424-
_initializeCursor(callback: Callback): void {
425-
super._initializeCursor((err, response) => {
430+
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
431+
const aggregateOperation = new AggregateOperation(
432+
{ s: { namespace: this.namespace } },
433+
this.pipeline,
434+
{
435+
...this.cursorOptions,
436+
...this.options,
437+
session
438+
}
439+
);
440+
441+
executeOperation(this.topology, aggregateOperation, (err, response) => {
426442
if (err || response == null) {
427-
callback(err, response);
428-
return;
443+
return callback(err);
429444
}
430445

446+
const server = aggregateOperation.server;
431447
if (
432448
this.startAtOperationTime == null &&
433449
this.resumeAfter == null &&
434450
this.startAfter == null &&
435-
maxWireVersion(this.server) >= 7
451+
maxWireVersion(server) >= 7
436452
) {
437453
this.startAtOperationTime = response.operationTime;
438454
}
@@ -441,15 +457,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
441457

442458
this.emit('init', response);
443459
this.emit('response');
444-
callback(err, response);
460+
461+
// TODO: NODE-2882
462+
callback(undefined, { server, session, response });
445463
});
446464
}
447465

448-
_getMore(callback: Callback): void {
449-
super._getMore((err, response) => {
466+
_getMore(batchSize: number, callback: Callback): void {
467+
super._getMore(batchSize, (err, response) => {
450468
if (err) {
451-
callback(err);
452-
return;
469+
return callback(err);
453470
}
454471

455472
this._processBatch('nextBatch', response);
@@ -466,26 +483,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
466483
* @internal
467484
*/
468485
function createChangeStreamCursor(
469-
self: ChangeStream,
486+
changeStream: ChangeStream,
470487
options: ChangeStreamOptions
471488
): ChangeStreamCursor {
472489
const changeStreamStageOptions: Document = { fullDocument: options.fullDocument || 'default' };
473490
applyKnownOptions(changeStreamStageOptions, options, CHANGE_STREAM_OPTIONS);
474-
if (self.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
491+
if (changeStream.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
475492
changeStreamStageOptions.allChangesForCluster = true;
476493
}
477494

478-
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(self.pipeline);
495+
const pipeline = [{ $changeStream: changeStreamStageOptions } as Document].concat(
496+
changeStream.pipeline
497+
);
498+
479499
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);
480500
const changeStreamCursor = new ChangeStreamCursor(
481-
getTopology(self.parent),
482-
new AggregateOperation(self.parent, pipeline, options),
501+
getTopology(changeStream.parent),
502+
changeStream.namespace,
503+
pipeline,
483504
cursorOptions
484505
);
485506

486-
relayEvents(changeStreamCursor, self, ['resumeTokenChanged', 'end', 'close']);
507+
relayEvents(changeStreamCursor, changeStream, ['resumeTokenChanged', 'end', 'close']);
508+
if (changeStream.listenerCount(ChangeStream.CHANGE) > 0) {
509+
streamEvents(changeStream, changeStreamCursor);
510+
}
487511

488-
if (self.listenerCount(ChangeStream.CHANGE) > 0) streamEvents(self, changeStreamCursor);
489512
return changeStreamCursor;
490513
}
491514

@@ -532,24 +555,24 @@ function waitForTopologyConnected(
532555
}
533556

534557
function closeWithError(changeStream: ChangeStream, error: AnyError, callback?: Callback): void {
535-
if (!callback) changeStream.emit(ChangeStream.ERROR, error);
558+
if (!callback) {
559+
changeStream.emit(ChangeStream.ERROR, error);
560+
}
561+
536562
changeStream.close(() => callback && callback(error));
537563
}
538564

539565
function streamEvents(changeStream: ChangeStream, cursor: ChangeStreamCursor): void {
540566
const stream = changeStream[kCursorStream] || cursor.stream();
541567
changeStream[kCursorStream] = stream;
542-
stream.on(CursorStream.DATA, change => processNewChange(changeStream, change));
543-
stream.on(CursorStream.ERROR, error => processError(changeStream, error));
568+
stream.on('data', change => processNewChange(changeStream, change));
569+
stream.on('error', error => processError(changeStream, error));
544570
}
545571

546572
function endStream(changeStream: ChangeStream): void {
547573
const cursorStream = changeStream[kCursorStream];
548574
if (cursorStream) {
549-
[CursorStream.DATA, CursorStream.CLOSE, CursorStream.END, CursorStream.ERROR].forEach(event =>
550-
cursorStream.removeAllListeners(event)
551-
);
552-
575+
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
553576
cursorStream.destroy();
554577
}
555578

@@ -605,7 +628,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
605628

606629
// otherwise, raise an error and close the change stream
607630
function unresumableError(err: AnyError) {
608-
if (!callback) changeStream.emit(ChangeStream.ERROR, err);
631+
if (!callback) {
632+
changeStream.emit(ChangeStream.ERROR, err);
633+
}
634+
609635
changeStream.close(() => processResumeQueue(changeStream, err));
610636
}
611637

@@ -676,6 +702,7 @@ function processResumeQueue(changeStream: ChangeStream, err?: Error) {
676702
request(new MongoError('Change Stream is not open.'));
677703
return;
678704
}
705+
679706
request(err, changeStream.cursor);
680707
}
681708
}

src/cmap/events.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type { ConnectionPool, ConnectionPoolOptions } from './connection_pool';
44
import type { Connection } from './connection';
55
import type { Document } from '../bson';
66
import type { AnyError } from '../error';
7+
import { cloneDeep } from 'lodash';
78

89
/**
910
* The base export class for all monitoring events published from the connection pool
@@ -394,6 +395,10 @@ function extractCommand(command: WriteProtocolMessageType): Document {
394395
}
395396

396397
function extractReply(command: WriteProtocolMessageType, reply?: Document) {
398+
if (reply) {
399+
reply = cloneDeep(reply);
400+
}
401+
397402
if (command instanceof KillCursor) {
398403
return {
399404
ok: 1,

src/cmap/wire_protocol/get_more.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export interface GetMoreOptions extends CommandOptions {
1111
batchSize?: number;
1212
maxTimeMS?: number;
1313
maxAwaitTimeMS?: number;
14-
comment?: Document;
14+
comment?: Document | string;
1515
}
1616

1717
export function getMore(

src/cmap/wire_protocol/query.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,15 @@ export function query(
4343
}
4444

4545
const readPreference = getReadPreference(cmd, options);
46-
const findCmd = prepareFindCommand(server, ns, cmd);
46+
let findCmd = prepareFindCommand(server, ns, cmd);
47+
48+
// If we have explain, we need to rewrite the find command
49+
// to wrap it in the explain command
50+
if (typeof options.explain === 'boolean' && options.explain === true) {
51+
findCmd = {
52+
explain: findCmd
53+
};
54+
}
4755

4856
// NOTE: This actually modifies the passed in cmd, and our code _depends_ on this
4957
// side-effect. Change this ASAP
@@ -192,10 +200,15 @@ function prepareLegacyFindQuery(
192200
if (cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan;
193201
if (cmd.min) findCmd['$min'] = cmd.min;
194202
if (cmd.max) findCmd['$max'] = cmd.max;
195-
if (typeof cmd.showDiskLoc !== 'undefined') findCmd['$showDiskLoc'] = cmd.showDiskLoc;
203+
if (typeof cmd.showDiskLoc !== 'undefined') {
204+
findCmd['$showDiskLoc'] = cmd.showDiskLoc;
205+
} else if (typeof cmd.showRecordId !== 'undefined') {
206+
findCmd['$showDiskLoc'] = cmd.showRecordId;
207+
}
208+
196209
if (cmd.comment) findCmd['$comment'] = cmd.comment;
197210
if (cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS;
198-
if (cmd.explain) {
211+
if (options.explain) {
199212
// nToReturn must be 0 (match all) or negative (match N and close cursor)
200213
// nToReturn > 0 will give explain results equivalent to limit(0)
201214
numberToReturn = -Math.abs(cmd.limit || 0);

0 commit comments

Comments
 (0)