Skip to content

refactor: remove cursorState from Cursor and wire protcol methods #2549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
}

cacheResumeToken(resumeToken: ResumeToken): void {
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
this.resumeToken = this.cursorState.postBatchResumeToken;
if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
this.resumeToken = this.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
Expand All @@ -462,7 +462,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
_processBatch(batchName: string, response?: Document): void {
const cursor = response?.cursor || {};
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;
this.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor[batchName].length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
Expand Down
29 changes: 8 additions & 21 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ import {
MongoWriteConcernError
} from '../error';
import type { BinMsg, WriteProtocolMessageType, Response } from './commands';
import type { Document } from '../bson';
import type { Document, Long } from '../bson';
import type { AutoEncrypter } from '../deps';
import type { ConnectionOptions as TLSConnectionOptions } from 'tls';
import type { TcpNetConnectOpts, IpcNetConnectOpts } from 'net';
import type { Server } from '../sdam/server';
import type { MongoCredentials } from './auth/mongo_credentials';
import type { CommandOptions } from './wire_protocol/command';
import type { QueryOptions } from './wire_protocol/query';
import type { InternalCursorState } from '../cursor/core_cursor';
import type { GetMoreOptions } from './wire_protocol/get_more';
import type { InsertOptions, UpdateOptions, RemoveOptions } from './wire_protocol/index';
import type { Stream } from './connect';
import type { LoggerOptions } from '../logger';
import type { FindOptions } from '../operations/find';

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -247,28 +246,16 @@ export class Connection extends EventEmitter {
wp.command(makeServerTrampoline(this), ns, cmd, options as CommandOptions, callback);
}

query(
ns: string,
cmd: Document,
cursorState: InternalCursorState,
options: QueryOptions,
callback: Callback
): void {
wp.query(makeServerTrampoline(this), ns, cmd, cursorState, options, callback);
query(ns: string, cmd: Document, options: FindOptions, callback: Callback): void {
wp.query(makeServerTrampoline(this), ns, cmd, options, callback);
}

getMore(
ns: string,
cursorState: InternalCursorState,
batchSize: number,
options: GetMoreOptions,
callback: Callback
): void {
wp.getMore(makeServerTrampoline(this), ns, cursorState, batchSize, options, callback);
getMore(ns: string, cursorId: Long, options: GetMoreOptions, callback: Callback<Document>): void {
wp.getMore(makeServerTrampoline(this), ns, cursorId, options, callback);
}

killCursors(ns: string, cursorState: InternalCursorState, callback: Callback): void {
wp.killCursors(makeServerTrampoline(this), ns, cursorState, callback);
killCursors(ns: string, cursorIds: Long[], options: CommandOptions, callback: Callback): void {
wp.killCursors(makeServerTrampoline(this), ns, cursorIds, options, callback);
}

insert(ns: string, ops: Document[], options: InsertOptions, callback: Callback): void {
Expand Down
87 changes: 26 additions & 61 deletions src/cmap/wire_protocol/get_more.ts
Original file line number Diff line number Diff line change
@@ -1,95 +1,64 @@
import { GetMore } from '../commands';
import { Long, Document, pluckBSONSerializeOptions } from '../../bson';
import { MongoError, MongoNetworkError } from '../../error';
import { MongoError } from '../../error';
import { applyCommonQueryOptions } from './shared';
import { maxWireVersion, collectionNamespace, Callback } from '../../utils';
import { command, CommandOptions } from './command';
import type { Server } from '../../sdam/server';
import type { InternalCursorState } from '../../cursor/core_cursor';

/** @internal */
export type GetMoreOptions = CommandOptions;
export interface GetMoreOptions extends CommandOptions {
batchSize?: number;
maxTimeMS?: number;
maxAwaitTimeMS?: number;
comment?: Document;
}

export function getMore(
server: Server,
ns: string,
cursorState: InternalCursorState,
batchSize: number,
cursorId: Long,
options: GetMoreOptions,
callback: Callback<Document>
): void {
options = options || {};

const fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false;
const wireVersion = maxWireVersion(server);
const queryCallback: Callback<Document> = function (err, response) {
if (err || !response) return callback(err);

// If we have a timed out query or a cursor that was killed
if (response.cursorNotFound) {
return callback(new MongoNetworkError('cursor killed or timed out'));
}

if (wireVersion < 4) {
const cursorId =
typeof response.cursorId === 'number'
? Long.fromNumber(response.cursorId)
: response.cursorId;

cursorState.documents = response.documents;
cursorState.cursorId = cursorId;

callback();
return;
}

// We have an error detected
if (response.ok === 0) {
return callback(new MongoError(response));
}

// Ensure we have a Long valid cursor id
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: response.cursor.id;

cursorState.documents = response.cursor.nextBatch;
cursorState.cursorId = cursorId;

callback(undefined, response);
};

if (!cursorState.cursorId) {
if (!cursorId) {
callback(new MongoError('Invalid internal cursor state, no known cursor id'));
return;
}

const cursorId =
cursorState.cursorId instanceof Long
? cursorState.cursorId
: Long.fromNumber((cursorState.cursorId as unknown) as number);

if (wireVersion < 4) {
const getMoreOp = new GetMore(ns, cursorId, { numberToReturn: batchSize });
const getMoreOp = new GetMore(ns, cursorId, { numberToReturn: options.batchSize });
const queryOptions = applyCommonQueryOptions(
{},
Object.assign({ bsonOptions: pluckBSONSerializeOptions(options) }, cursorState)
Object.assign(options, { ...pluckBSONSerializeOptions(options) })
);

queryOptions.fullResult = true;
queryOptions.command = true;
server.s.pool.write(getMoreOp, queryOptions, queryCallback);
server.s.pool.write(getMoreOp, queryOptions, (err, response) => {
if (fullResult) return callback(err, response);
if (err) return callback(err);
callback(undefined, { cursor: { id: response.cursorId, nextBatch: response.documents } });
});

return;
}

const getMoreCmd: Document = {
getMore: cursorId,
collection: collectionNamespace(ns),
batchSize: Math.abs(batchSize)
collection: collectionNamespace(ns)
};

if (cursorState.cmd.tailable && typeof cursorState.cmd.maxAwaitTimeMS === 'number') {
getMoreCmd.maxTimeMS = cursorState.cmd.maxAwaitTimeMS;
if (typeof options.batchSize === 'number') {
getMoreCmd.batchSize = Math.abs(options.batchSize);
}

if (typeof options.maxAwaitTimeMS === 'number') {
getMoreCmd.maxTimeMS = options.maxAwaitTimeMS;
}

const commandOptions = Object.assign(
Expand All @@ -100,9 +69,5 @@ export function getMore(
options
);

if (cursorState.session) {
commandOptions.session = cursorState.session;
}

command(server, ns, getMoreCmd, commandOptions, queryCallback);
command(server, ns, getMoreCmd, commandOptions, callback);
}
49 changes: 10 additions & 39 deletions src/cmap/wire_protocol/kill_cursors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,28 @@ import { maxWireVersion, collectionNamespace, Callback } from '../../utils';
import { command, CommandOptions } from './command';
import { MongoError, MongoNetworkError } from '../../error';
import type { Server } from '../../sdam/server';
import type { InternalCursorState } from '../../cursor/core_cursor';
import type { ClientSession } from '../../sessions';

interface KillCursorOptions {
session?: ClientSession;
immediateRelease: boolean;
noResponse: boolean;
}
import type { Long } from '../../bson';

export function killCursors(
server: Server,
ns: string,
cursorState: InternalCursorState,
cursorIds: Long[],
options: CommandOptions,
callback: Callback
): void {
callback = typeof callback === 'function' ? callback : () => undefined;

if (!cursorState.cursorId) {
callback(new MongoError('Invalid internal cursor state, no known cursor id'));
return;
if (!cursorIds || !Array.isArray(cursorIds)) {
throw new TypeError('Invalid list of cursor ids provided: ' + cursorIds);
}

const cursorIds = [cursorState.cursorId];

if (maxWireVersion(server) < 4) {
const pool = server.s.pool;
const killCursor = new KillCursor(ns, cursorIds);
const options: KillCursorOptions = {
immediateRelease: true,
noResponse: true
};

if (typeof cursorState.session === 'object') {
options.session = cursorState.session;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we want to stop passing the session here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, sorry it looks like we lost your comments somewhere (maybe the force push). So what's going on here is that the wire protocol killCursors command used to take in a cursorState which might have had a session, and now it takes an options which may have a session. The options are passed all the way down to the command method which will use the session there, so there's no need to copy it here any longer

}

if (pool && pool.isConnected()) {
try {
pool.write(killCursor, options, callback);
} catch (err) {
if (typeof callback === 'function') {
callback(err, null);
} else {
console.warn(err);
}
}
try {
pool.write(killCursor, { noResponse: true, ...options }, callback);
} catch (err) {
callback(err);
}

return;
Expand All @@ -59,12 +35,7 @@ export function killCursors(
cursors: cursorIds
};

const options: CommandOptions = { fullResult: true };
if (typeof cursorState.session === 'object') {
options.session = cursorState.session;
}

command(server, ns, killCursorCmd, options, (err, response) => {
command(server, ns, killCursorCmd, { fullResult: true, ...options }, (err, response) => {
if (err || !response) {
return callback(err);
}
Expand Down
Loading