Skip to content

refactor(NODE-4125): misc change stream improvements #3284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 10, 2022
Merged
14 changes: 14 additions & 0 deletions global.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ declare global {
(title: string, metadata: MongoDBMetadataUI, fn: (this: Suite) => void): Mocha.Suite;
}

interface ExclusiveSuiteFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
}

interface ExclusiveTestFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.Func>): Mocha.Test;
(title: string, metadataAndTest: MetadataAndTest<Mocha.AsyncFunc>): Mocha.Test;
}

interface TestFunction {
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.Func): Mocha.Test;
(title: string, metadata: MongoDBMetadataUI, fn: Mocha.AsyncFunc): Mocha.Test;
Expand Down
227 changes: 23 additions & 204 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@ import Denque = require('denque');
import type { Readable } from 'stream';
import { setTimeout } from 'timers';

import type { Binary, Document, Long, Timestamp } from './bson';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import {
AbstractCursor,
AbstractCursorEvents,
AbstractCursorOptions,
CursorStreamOptions
} from './cursor/abstract_cursor';
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
import { ChangeStreamCursor, ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
import { Db } from './db';
import {
AnyError,
Expand All @@ -20,13 +16,12 @@ import {
MongoRuntimeError
} from './error';
import { MongoClient } from './mongo_client';
import { InferIdType, TODO_NODE_3286, TypedEventEmitter } from './mongo_types';
import { AggregateOperation, AggregateOptions } from './operations/aggregate';
import { InferIdType, TypedEventEmitter } from './mongo_types';
import type { AggregateOptions } from './operations/aggregate';
import type { CollationOptions, OperationParent } from './operations/command';
import { executeOperation, ExecutionResult } from './operations/execute_operation';
import type { ReadPreference } from './read_preference';
import type { Topology } from './sdam/topology';
import type { ClientSession, ServerSessionId } from './sessions';
import type { ServerSessionId } from './sessions';
import {
calculateDurationInMs,
Callback,
Expand Down Expand Up @@ -111,18 +106,6 @@ export interface PipeOptions {
end?: boolean;
}

/** @internal */
export type ChangeStreamAggregateRawResult<TChange> = {
$clusterTime: { clusterTime: Timestamp };
cursor: {
postBatchResumeToken: ResumeToken;
ns: string;
id: number | Long;
} & ({ firstBatch: TChange[] } | { nextBatch: TChange[] });
ok: 1;
operationTime: Timestamp;
};

/**
* Options that can be passed to a ChangeStream. Note that startAfter, resumeAfter, and startAtOperationTime are all mutually exclusive, and the server will error if more than one is specified.
* @public
Expand Down Expand Up @@ -700,6 +683,21 @@ export class ChangeStream<
});
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}

/** Is the cursor closed */
get closed(): boolean {
return this[kClosed] || (this.cursor?.closed ?? false);
Expand Down Expand Up @@ -733,21 +731,6 @@ export class ChangeStream<
return this.cursor.stream(options);
}

/**
* Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybePromise(callback, cb => {
this._getCursor((err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
return cursor.tryNext(cb);
});
});
}

/** @internal */
private _setIsEmitter(): void {
if (this[kMode] === 'iterator') {
Expand Down Expand Up @@ -923,15 +906,6 @@ export class ChangeStream<
this._processResumeQueue();
};

// otherwise, raise an error and close the change stream
const unresumableError = (err: AnyError) => {
if (!callback) {
this.emit(ChangeStream.ERROR, err);
}

this.close(() => this._processResumeQueue(err));
};

if (cursor && isResumableError(error, maxWireVersion(cursor.server))) {
this.cursor = undefined;

Expand All @@ -944,7 +918,7 @@ export class ChangeStream<
const topology = getTopology(this.parent);
this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => {
// if the topology can't reconnect, close the stream
if (err) return unresumableError(err);
if (err) return this._closeWithError(err, callback);

// create a new cursor, preserving the old cursor's options
const newCursor = this._createChangeStreamCursor(cursor.resumeOptions);
Expand All @@ -955,7 +929,7 @@ export class ChangeStream<
// attempt to continue in iterator mode
newCursor.hasNext(err => {
// if there's an error immediately after resuming, close the stream
if (err) return unresumableError(err);
if (err) return this._closeWithError(err);
resumeWithCursor(newCursor);
});
});
Expand Down Expand Up @@ -1010,158 +984,3 @@ export class ChangeStream<
}
}
}

/** @internal */
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
startAtOperationTime?: OperationTime;
resumeAfter?: ResumeToken;
startAfter?: ResumeToken;
maxAwaitTimeMS?: number;
collation?: CollationOptions;
fullDocument?: string;
}

/** @internal */
export class ChangeStreamCursor<
TSchema extends Document = Document,
TChange extends Document = ChangeStreamDocument<TSchema>
> extends AbstractCursor<TChange, ChangeStreamEvents> {
_resumeToken: ResumeToken;
startAtOperationTime?: OperationTime;
hasReceived?: boolean;
resumeAfter: ResumeToken;
startAfter: ResumeToken;
options: ChangeStreamCursorOptions;

postBatchResumeToken?: ResumeToken;
pipeline: Document[];

constructor(
client: MongoClient,
namespace: MongoDBNamespace,
pipeline: Document[] = [],
options: ChangeStreamCursorOptions = {}
) {
super(client, namespace, options);

this.pipeline = pipeline;
this.options = options;
this._resumeToken = null;
this.startAtOperationTime = options.startAtOperationTime;

if (options.startAfter) {
this.resumeToken = options.startAfter;
} else if (options.resumeAfter) {
this.resumeToken = options.resumeAfter;
}
}

set resumeToken(token: ResumeToken) {
this._resumeToken = token;
this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
}

get resumeToken(): ResumeToken {
return this._resumeToken;
}

get resumeOptions(): ChangeStreamCursorOptions {
const options: ChangeStreamCursorOptions = {
...this.options
};

for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
delete options[key];
}

if (this.resumeToken != null) {
if (this.options.startAfter && !this.hasReceived) {
options.startAfter = this.resumeToken;
} else {
options.resumeAfter = this.resumeToken;
}
} else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
options.startAtOperationTime = this.startAtOperationTime;
}

return options;
}

cacheResumeToken(resumeToken: ResumeToken): void {
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
this.resumeToken = this.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(response: ChangeStreamAggregateRawResult<TChange>): void {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.postBatchResumeToken = response.cursor.postBatchResumeToken;

const batch =
'firstBatch' in response.cursor ? response.cursor.firstBatch : response.cursor.nextBatch;
if (batch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

clone(): AbstractCursor<TChange> {
return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, {
...this.cursorOptions
});
}

_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
...this.cursorOptions,
...this.options,
session
});

executeOperation<TODO_NODE_3286, ChangeStreamAggregateRawResult<TChange>>(
session.client,
aggregateOperation,
(err, response) => {
if (err || response == null) {
return callback(err);
}

const server = aggregateOperation.server;
if (
this.startAtOperationTime == null &&
this.resumeAfter == null &&
this.startAfter == null &&
maxWireVersion(server) >= 7
) {
this.startAtOperationTime = response.operationTime;
}

this._processBatch(response);

this.emit(ChangeStream.INIT, response);
this.emit(ChangeStream.RESPONSE);

// TODO: NODE-2882
callback(undefined, { server, session, response });
}
);
}

override _getMore(batchSize: number, callback: Callback): void {
super._getMore(batchSize, (err, response) => {
if (err) {
return callback(err);
}

this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult<TChange>);

this.emit(ChangeStream.MORE, response);
this.emit(ChangeStream.RESPONSE);
callback(err, response);
});
}
}
Loading