Skip to content

refactor: move find command building all into the find operation #2643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import { StreamDescription, StreamDescriptionOptions } from './stream_descriptio
import * as wp from './wire_protocol';
import { CommandStartedEvent, CommandFailedEvent, CommandSucceededEvent } from './events';
import { updateSessionFromResponse } from '../sessions';
import { uuidV4, ClientMetadata, now, calculateDurationInMs, Callback } from '../utils';
import {
uuidV4,
ClientMetadata,
now,
calculateDurationInMs,
Callback,
MongoDBNamespace
} from '../utils';
import {
MongoError,
MongoNetworkError,
Expand All @@ -23,7 +30,7 @@ import type { GetMoreOptions } from './wire_protocol/get_more';
import type { InsertOptions, UpdateOptions, RemoveOptions } from './wire_protocol/index';
import type { Stream } from './connect';
import type { LoggerOptions } from '../logger';
import type { FindOptions } from '../operations/find';
import type { QueryOptions } from './wire_protocol/query';

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -246,7 +253,7 @@ export class Connection extends EventEmitter {
wp.command(makeServerTrampoline(this), ns, cmd, options as CommandOptions, callback);
}

query(ns: string, cmd: Document, options: FindOptions, callback: Callback): void {
query(ns: MongoDBNamespace, cmd: Document, options: QueryOptions, callback: Callback): void {
wp.query(makeServerTrampoline(this), ns, cmd, options, callback);
}

Expand Down
276 changes: 66 additions & 210 deletions src/cmap/wire_protocol/query.ts
Original file line number Diff line number Diff line change
@@ -1,175 +1,40 @@
import { command, CommandOptions } from './command';
import { Query } from '../commands';
import { MongoError } from '../../error';
import { maxWireVersion, collectionNamespace, Callback, decorateWithExplain } from '../../utils';
import { getReadPreference, isSharded, applyCommonQueryOptions } from './shared';
import { Document, pluckBSONSerializeOptions } from '../../bson';
import { OpQueryOptions, Query } from '../commands';
import type { Callback, MongoDBNamespace } from '../../utils';
import { BSONSerializeOptions, Document, pluckBSONSerializeOptions } from '../../bson';
import type { Server } from '../../sdam/server';
import type { ReadPreferenceLike } from '../../read_preference';
import type { FindOptions } from '../../operations/find';
import { Explain } from '../../explain';
import { ReadPreference } from '../../read_preference';

/** @internal */
export interface QueryOptions extends CommandOptions {
readPreference?: ReadPreferenceLike;
export interface QueryOptions extends BSONSerializeOptions {
readPreference: ReadPreference;
documentsReturnedIn?: string;
batchSize?: number;
limit?: number;
skip?: number;
projection?: Document;
tailable?: boolean;
awaitData?: boolean;
noCursorTimeout?: boolean;
/** @deprecated use `noCursorTimeout` instead */
timeout?: boolean;
partial?: boolean;
oplogReplay?: boolean;
}

export function query(
server: Server,
ns: string,
cmd: Document,
options: FindOptions,
ns: MongoDBNamespace,
findCommand: Document,
options: QueryOptions,
callback: Callback
): void {
options = options || {};

if (cmd == null) {
return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`));
}

if (maxWireVersion(server) < 4) {
const query = prepareLegacyFindQuery(server, ns, cmd, options);
const queryOptions = applyCommonQueryOptions(
{},
Object.assign(options, { ...pluckBSONSerializeOptions(options) })
);

queryOptions.fullResult = true;
if (typeof query.documentsReturnedIn === 'string') {
queryOptions.documentsReturnedIn = query.documentsReturnedIn;
}

server.s.pool.write(query, queryOptions, callback);
return;
}

const readPreference = getReadPreference(cmd, options);
let findCmd = prepareFindCommand(server, ns, cmd);

// If we have explain, we need to rewrite the find command
// to wrap it in the explain command
const explain = Explain.fromOptions(options);
if (explain) {
findCmd = decorateWithExplain(findCmd, explain);
}

// NOTE: This actually modifies the passed in cmd, and our code _depends_ on this
// side-effect. Change this ASAP
cmd.virtual = false;

const commandOptions = Object.assign(
{
documentsReturnedIn: 'firstBatch',
numberToReturn: 1,
slaveOk: readPreference.slaveOk()
},
options
);

command(server, ns, findCmd, commandOptions, callback);
}

function prepareFindCommand(server: Server, ns: string, cmd: Document) {
const findCmd: Document = {
find: collectionNamespace(ns)
};

if (cmd.query) {
if (cmd.query['$query']) {
findCmd.filter = cmd.query['$query'];
} else {
findCmd.filter = cmd.query;
}
}

let sortValue = cmd.sort;
if (Array.isArray(sortValue)) {
const sortObject: Document = {};

if (sortValue.length > 0 && !Array.isArray(sortValue[0])) {
let sortDirection = sortValue[1];
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

sortObject[sortValue[0]] = sortDirection;
} else {
for (let i = 0; i < sortValue.length; i++) {
let sortDirection = sortValue[i][1];
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

sortObject[sortValue[i][0]] = sortDirection;
}
}

sortValue = sortObject;
}

if (typeof cmd.allowDiskUse === 'boolean') {
findCmd.allowDiskUse = cmd.allowDiskUse;
}

if (cmd.sort) findCmd.sort = sortValue;
if (cmd.fields) findCmd.projection = cmd.fields;
if (cmd.hint) findCmd.hint = cmd.hint;
if (cmd.skip) findCmd.skip = cmd.skip;
if (cmd.limit) findCmd.limit = cmd.limit;
if (cmd.limit < 0) {
findCmd.limit = Math.abs(cmd.limit);
findCmd.singleBatch = true;
}

if (typeof cmd.batchSize === 'number') {
if (cmd.batchSize < 0) {
if (cmd.limit !== 0 && Math.abs(cmd.batchSize) < Math.abs(cmd.limit)) {
findCmd.limit = Math.abs(cmd.batchSize);
}

findCmd.singleBatch = true;
}

findCmd.batchSize = Math.abs(cmd.batchSize);
}

if (cmd.comment) findCmd.comment = cmd.comment;
if (cmd.maxScan) findCmd.maxScan = cmd.maxScan;
if (cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
if (cmd.min) findCmd.min = cmd.min;
if (cmd.max) findCmd.max = cmd.max;
findCmd.returnKey = cmd.returnKey ? cmd.returnKey : false;
findCmd.showRecordId = cmd.showDiskLoc ? cmd.showDiskLoc : false;
if (cmd.snapshot) findCmd.snapshot = cmd.snapshot;
if (cmd.tailable) findCmd.tailable = cmd.tailable;
if (cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
if (cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
if (cmd.awaitData) findCmd.awaitData = cmd.awaitData;
if (cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
if (cmd.partial) findCmd.partial = cmd.partial;
if (cmd.collation) findCmd.collation = cmd.collation;
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern;

return findCmd;
}

function prepareLegacyFindQuery(
server: Server,
ns: string,
cmd: Document,
options: FindOptions
): Query {
options = options || {};

const readPreference = getReadPreference(cmd, options);
const batchSize = cmd.batchSize || options.batchSize || 0;
const limit = cmd.limit || options.limit;
const numberToSkip = cmd.skip || options.skip || 0;

const isExplain = typeof findCommand.$explain !== 'undefined';
const readPreference = options.readPreference ?? ReadPreference.primary;
const batchSize = options.batchSize || 0;
const limit = options.limit;
const numberToSkip = options.skip || 0;
let numberToReturn = 0;
if (
limit &&
Expand All @@ -180,66 +45,57 @@ function prepareLegacyFindQuery(
numberToReturn = batchSize;
}

const findCmd: Document = {};
if (isSharded(server) && readPreference) {
findCmd['$readPreference'] = readPreference.toJSON();
if (isExplain) {
// nToReturn must be 0 (match all) or negative (match N and close cursor)
// nToReturn > 0 will give explain results equivalent to limit(0)
numberToReturn = -Math.abs(limit || 0);
}

if (cmd.sort) findCmd['$orderby'] = cmd.sort;
if (cmd.hint) findCmd['$hint'] = cmd.hint;
if (cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot;
if (typeof cmd.returnKey !== 'undefined') findCmd['$returnKey'] = cmd.returnKey;
if (cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan;
if (cmd.min) findCmd['$min'] = cmd.min;
if (cmd.max) findCmd['$max'] = cmd.max;
if (typeof cmd.showDiskLoc !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showDiskLoc;
} else if (typeof cmd.showRecordId !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showRecordId;
const queryOptions: OpQueryOptions = {
numberToSkip,
numberToReturn,
pre32Limit: typeof limit === 'number' ? limit : undefined,
checkKeys: false,
slaveOk: readPreference.slaveOk()
};

if (options.projection) {
queryOptions.returnFieldSelector = options.projection;
}

if (cmd.comment) findCmd['$comment'] = cmd.comment;
if (cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS;
if (options.explain) {
// nToReturn must be 0 (match all) or negative (match N and close cursor)
// nToReturn > 0 will give explain results equivalent to limit(0)
numberToReturn = -Math.abs(cmd.limit || 0);
findCmd['$explain'] = true;
const query = new Query(ns.toString(), findCommand, queryOptions);
if (typeof options.tailable === 'boolean') {
query.tailable = options.tailable;
}

findCmd['$query'] = cmd.query;
if (cmd.readConcern && cmd.readConcern.level !== 'local') {
throw new MongoError(
`server find command does not support a readConcern level of ${cmd.readConcern.level}`
);
if (typeof options.oplogReplay === 'boolean') {
query.oplogReplay = options.oplogReplay;
}

if (cmd.readConcern) {
cmd = Object.assign({}, cmd);
delete cmd['readConcern'];
if (typeof options.timeout === 'boolean') {
query.noCursorTimeout = options.timeout;
} else if (typeof options.noCursorTimeout === 'boolean') {
query.noCursorTimeout = options.noCursorTimeout;
}

const serializeFunctions =
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
const ignoreUndefined =
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
if (typeof options.awaitData === 'boolean') {
query.awaitData = options.awaitData;
}

const query = new Query(ns, findCmd, {
numberToSkip,
numberToReturn,
pre32Limit: typeof limit === 'number' ? limit : undefined,
checkKeys: false,
returnFieldSelector: cmd.fields,
serializeFunctions,
ignoreUndefined
});
if (typeof options.partial === 'boolean') {
query.partial = options.partial;
}

if (typeof cmd.tailable === 'boolean') query.tailable = cmd.tailable;
if (typeof cmd.oplogReplay === 'boolean') query.oplogReplay = cmd.oplogReplay;
if (typeof cmd.noCursorTimeout === 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
if (typeof cmd.awaitData === 'boolean') query.awaitData = cmd.awaitData;
if (typeof cmd.partial === 'boolean') query.partial = cmd.partial;
server.s.pool.write(
query,
{ fullResult: true, ...pluckBSONSerializeOptions(options) },
(err, result) => {
if (err || !result) return callback(err, result);
if (isExplain && result.documents && result.documents[0]) {
return callback(undefined, result.documents[0]);
}

query.slaveOk = readPreference.slaveOk();
return query;
callback(undefined, result);
}
);
}
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ export type {
UpdateOptions as WireUpdateOptions,
RemoveOptions as WireRemoveOptions
} from './cmap/wire_protocol/index';
export type { QueryOptions } from './cmap/wire_protocol/query';
export type { CollationOptions, WriteCommandOptions } from './cmap/wire_protocol/write_command';
export type { CollectionPrivate, CollectionOptions } from './collection';
export type { AggregationCursorOptions } from './cursor/aggregation_cursor';
Expand Down
Loading