Skip to content

Commit dfca70d

Browse files
authored
refactor: remove Topology#command method (#2545)
NODE-2801
1 parent 10a0695 commit dfca70d

File tree

12 files changed

+151
-229
lines changed

12 files changed

+151
-229
lines changed

src/cmap/wire_protocol/command.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ export interface CommandOptions extends BSONSerializeOptions {
2626
documentsReturnedIn?: string;
2727
noResponse?: boolean;
2828

29-
// NOTE: these are for retryable writes and will be removed soon
29+
// FIXME: NODE-2802
3030
willRetryWrite?: boolean;
31-
retryWrites?: boolean;
32-
retrying?: boolean;
3331

3432
// FIXME: NODE-2781
3533
writeConcern?: WriteConcernOptions | WriteConcern | W;
@@ -137,7 +135,6 @@ function _command(
137135

138136
// This value is not overridable
139137
commandOptions.slaveOk = readPreference.slaveOk();
140-
141138
const cmdNs = `${databaseNamespace(ns)}.$cmd`;
142139
const message = shouldUseOpMsg
143140
? new Msg(cmdNs, finalCmd, commandOptions)

src/operations/command.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { ReadConcern } from '../read_concern';
33
import { WriteConcern, WriteConcernOptions } from '../write_concern';
44
import { maxWireVersion, MongoDBNamespace, Callback } from '../utils';
55
import { ReadPreference, ReadPreferenceLike } from '../read_preference';
6-
import { commandSupportsReadConcern, ClientSession } from '../sessions';
6+
import { commandSupportsReadConcern } from '../sessions';
77
import { MongoError } from '../error';
88
import type { Logger } from '../logger';
99
import type { Server } from '../sdam/server';
@@ -14,13 +14,12 @@ const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5;
1414

1515
/** @public */
1616
export interface CommandOperationOptions extends OperationOptions, WriteConcernOptions {
17+
/** Return the full server response for the command */
1718
fullResponse?: boolean;
1819
/** Specify a read concern and level for the collection. (only MongoDB 3.2 or higher supported) */
1920
readConcern?: ReadConcern;
2021
/** The preferred read preference (ReadPreference.primary, ReadPreference.primary_preferred, ReadPreference.secondary, ReadPreference.secondary_preferred, ReadPreference.nearest). */
2122
readPreference?: ReadPreferenceLike;
22-
/** Specify ClientSession for this command */
23-
session?: ClientSession;
2423
/** Collation */
2524
collation?: CollationOptions;
2625
maxTimeMS?: number;
@@ -32,6 +31,7 @@ export interface CommandOperationOptions extends OperationOptions, WriteConcernO
3231
// Admin command overrides.
3332
dbName?: string;
3433
authdb?: string;
34+
noResponse?: boolean;
3535
}
3636

3737
/** @internal */
@@ -56,16 +56,20 @@ export abstract class CommandOperation<
5656
fullResponse?: boolean;
5757
logger?: Logger;
5858

59-
constructor(parent: OperationParent, options?: T) {
59+
constructor(parent?: OperationParent, options?: T) {
6060
super(options);
6161

6262
// NOTE: this was explicitly added for the add/remove user operations, it's likely
6363
// something we'd want to reconsider. Perhaps those commands can use `Admin`
6464
// as a parent?
6565
const dbNameOverride = options?.dbName || options?.authdb;
66-
this.ns = dbNameOverride
67-
? new MongoDBNamespace(dbNameOverride, '$cmd')
68-
: parent.s.namespace.withCollection('$cmd');
66+
if (dbNameOverride) {
67+
this.ns = new MongoDBNamespace(dbNameOverride, '$cmd');
68+
} else {
69+
this.ns = parent
70+
? parent.s.namespace.withCollection('$cmd')
71+
: new MongoDBNamespace('admin', '$cmd');
72+
}
6973

7074
const propertyProvider = this.hasAspect(Aspect.NO_INHERIT_OPTIONS) ? undefined : parent;
7175
this.readPreference = this.hasAspect(Aspect.WRITE_OPERATION)
@@ -82,7 +86,7 @@ export abstract class CommandOperation<
8286
this.options.readPreference = this.readPreference;
8387

8488
// TODO(NODE-2056): make logger another "inheritable" property
85-
if (parent.logger) {
89+
if (parent && parent.logger) {
8690
this.logger = parent.logger;
8791
}
8892
}

src/operations/execute_operation.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,9 @@ function supportsRetryableReads(server: Server) {
123123
}
124124

125125
function executeWithServerSelection(topology: Topology, operation: any, callback: Callback) {
126+
const session = operation.session;
126127
const readPreference = operation.readPreference || ReadPreference.primary;
127-
const inTransaction = operation.session && operation.session.inTransaction();
128+
const inTransaction = session && session.inTransaction();
128129

129130
if (inTransaction && !readPreference.equals(ReadPreference.primary)) {
130131
callback(
@@ -136,10 +137,7 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
136137
return;
137138
}
138139

139-
const serverSelectionOptions = {
140-
session: operation.session
141-
};
142-
140+
const serverSelectionOptions = { session };
143141
function callbackWithRetry(err?: any, result?: any) {
144142
if (err == null) {
145143
return callback(undefined, result);
@@ -176,7 +174,7 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
176174
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
177175
(operation.hasAspect(Aspect.WRITE_OPERATION) && !supportsRetryableWrites(server))
178176
) {
179-
callback(err, null);
177+
callback(err);
180178
return;
181179
}
182180

@@ -187,20 +185,20 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
187185
// select a server, and execute the operation against it
188186
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
189187
if (err) {
190-
callback(err, null);
188+
callback(err);
191189
return;
192190
}
193191

194192
const willRetryRead =
195193
topology.s.options.retryReads !== false &&
196-
operation.session &&
194+
session &&
197195
!inTransaction &&
198196
supportsRetryableReads(server) &&
199197
operation.canRetryRead;
200198

201199
const willRetryWrite =
202200
topology.s.options.retryWrites === true &&
203-
operation.session &&
201+
session &&
204202
!inTransaction &&
205203
supportsRetryableWrites(server) &&
206204
operation.canRetryWrite;
@@ -212,7 +210,7 @@ function executeWithServerSelection(topology: Topology, operation: any, callback
212210
) {
213211
if (operation.hasAspect(Aspect.WRITE_OPERATION) && willRetryWrite) {
214212
operation.options.willRetryWrite = true;
215-
operation.session.incrementTransactionNumber();
213+
session.incrementTransactionNumber();
216214
}
217215

218216
operation.execute(server, callbackWithRetry);

src/operations/operation.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ export interface OperationConstructor extends Function {
2121

2222
/** @internal */
2323
export interface OperationOptions extends BSONSerializeOptions {
24-
explain?: boolean;
24+
/** Specify ClientSession for this command */
2525
session?: ClientSession;
26+
27+
explain?: boolean;
28+
willRetryWrites?: boolean;
2629
}
2730

2831
/**

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export class RunCommandOperation<
1414
> extends CommandOperation<T, TResult> {
1515
command: Document;
1616

17-
constructor(parent: OperationParent, command: Document, options?: T) {
17+
constructor(parent: OperationParent | undefined, command: Document, options?: T) {
1818
super(parent, options);
1919
this.command = command;
2020
}
@@ -29,7 +29,7 @@ export class RunAdminCommandOperation<
2929
T extends RunCommandOptions = RunCommandOptions,
3030
TResult = Document
3131
> extends RunCommandOperation<T, TResult> {
32-
constructor(parent: OperationParent, command: Document, options?: T) {
32+
constructor(parent: OperationParent | undefined, command: Document, options?: T) {
3333
super(parent, command, options);
3434
this.ns = new MongoDBNamespace('admin');
3535
}

src/sdam/topology.ts

Lines changed: 38 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ import type { Transaction } from '../transactions';
5454
import type { CloseOptions } from '../cmap/connection_pool';
5555
import type { LoggerOptions } from '../logger';
5656
import { DestroyOptions, Connection } from '../cmap/connection';
57-
import type { CommandOptions } from '../cmap/wire_protocol/command';
5857
import { RunCommandOperation } from '../operations/run_command';
5958
import type { CursorOptions } from '../cursor/cursor';
6059
import type { MongoClientOptions } from '../mongo_client';
@@ -356,15 +355,28 @@ export class Topology extends EventEmitter {
356355

357356
ReadPreference.translate(options);
358357
const readPreference = options.readPreference || ReadPreference.primary;
359-
const connectHandler = (err: Error | MongoError | undefined) => {
358+
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
360359
if (err) {
361360
this.close();
362361

363-
if (typeof callback === 'function') {
364-
callback(err);
365-
} else {
366-
this.emit(Topology.ERROR, err);
367-
}
362+
typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err);
363+
return;
364+
}
365+
366+
// TODO: NODE-2471
367+
if (server && this.s.credentials) {
368+
server.command('admin.$cmd', { ping: 1 }, err => {
369+
if (err) {
370+
typeof callback === 'function' ? callback(err) : this.emit(Topology.ERROR, err);
371+
return;
372+
}
373+
374+
stateTransition(this, STATE_CONNECTED);
375+
this.emit(Topology.OPEN, err, this);
376+
this.emit(Topology.CONNECT, this);
377+
378+
if (typeof callback === 'function') callback(undefined, this);
379+
});
368380

369381
return;
370382
}
@@ -373,16 +385,8 @@ export class Topology extends EventEmitter {
373385
this.emit(Topology.OPEN, err, this);
374386
this.emit(Topology.CONNECT, this);
375387

376-
if (typeof callback === 'function') callback(err, this);
377-
};
378-
379-
// TODO: NODE-2471
380-
if (this.s.credentials) {
381-
this.command('admin.$cmd', { ping: 1 }, { readPreference }, connectHandler);
382-
return;
383-
}
384-
385-
this.selectServer(readPreferenceServerSelector(readPreference), options, connectHandler);
388+
if (typeof callback === 'function') callback(undefined, this);
389+
});
386390
}
387391

388392
/** Close this topology */
@@ -567,18 +571,27 @@ export class Topology extends EventEmitter {
567571
}
568572

569573
/** Send endSessions command(s) with the given session ids */
570-
endSessions(sessions: ServerSessionId[], callback?: Callback): void {
574+
endSessions(sessions: ServerSessionId[], callback?: Callback<Document>): void {
571575
if (!Array.isArray(sessions)) {
572576
sessions = [sessions];
573577
}
574578

575-
this.command(
576-
'admin.$cmd',
577-
{ endSessions: sessions },
578-
{ readPreference: ReadPreference.primaryPreferred, noResponse: true },
579-
() => {
580-
// intentionally ignored, per spec
581-
if (typeof callback === 'function') callback();
579+
this.selectServer(
580+
readPreferenceServerSelector(ReadPreference.primaryPreferred),
581+
(err, server) => {
582+
if (err || !server) {
583+
if (typeof callback === 'function') callback(err);
584+
return;
585+
}
586+
587+
server.command(
588+
'admin.$cmd',
589+
{ endSessions: sessions },
590+
{ noResponse: true },
591+
(err, result) => {
592+
if (typeof callback === 'function') callback(err, result);
593+
}
594+
);
582595
}
583596
);
584597
}
@@ -669,56 +682,6 @@ export class Topology extends EventEmitter {
669682
if (typeof callback === 'function') callback(undefined, true);
670683
}
671684

672-
/**
673-
* Execute a command
674-
*
675-
* @param ns - The MongoDB fully qualified namespace (ex: db1.collection1)
676-
* @param cmd - The command
677-
*/
678-
command(ns: string, cmd: Document, options: CommandOptions, callback: Callback): void {
679-
if (typeof options === 'function') {
680-
(callback = options), (options = {}), (options = options || {});
681-
}
682-
683-
ReadPreference.translate(options);
684-
const readPreference = (options.readPreference as ReadPreference) || ReadPreference.primary;
685-
686-
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
687-
if (err || !server) {
688-
callback(err);
689-
return;
690-
}
691-
692-
const willRetryWrite =
693-
!options.retrying &&
694-
!!options.retryWrites &&
695-
options.session &&
696-
isRetryableWritesSupported(this) &&
697-
!options.session.inTransaction() &&
698-
isWriteCommand(cmd);
699-
700-
// increment and assign txnNumber
701-
if (willRetryWrite) {
702-
options.session?.incrementTransactionNumber();
703-
options.willRetryWrite = willRetryWrite;
704-
}
705-
706-
server.command(ns, cmd, options, (err, result) => {
707-
if (!err) return callback(undefined, result);
708-
if (!shouldRetryOperation(err)) {
709-
return callback(err);
710-
}
711-
712-
if (willRetryWrite) {
713-
const newOptions = Object.assign({}, options, { retrying: true });
714-
return this.command(ns, cmd, newOptions, callback);
715-
}
716-
717-
return callback(err);
718-
});
719-
});
720-
}
721-
722685
/**
723686
* Create a new cursor
724687
*
@@ -788,11 +751,6 @@ export class Topology extends EventEmitter {
788751
);
789752
}
790753

791-
const RETRYABLE_WRITE_OPERATIONS = ['findAndModify', 'insert', 'update', 'delete'];
792-
function isWriteCommand(command: Document) {
793-
return RETRYABLE_WRITE_OPERATIONS.some((op: string) => command[op]);
794-
}
795-
796754
/** Destroys a server, and removes all event listeners from the instance */
797755
function destroyServer(
798756
server: Server,
@@ -941,10 +899,6 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
941899
}
942900
}
943901

944-
function shouldRetryOperation(err: AnyError) {
945-
return err instanceof MongoError && err.hasErrorLabel('RetryableWriteError');
946-
}
947-
948902
function srvPollingHandler(topology: Topology) {
949903
return function handleSrvPolling(ev: SrvPollingEvent) {
950904
const previousTopologyDescription = topology.s.description;
@@ -1058,26 +1012,6 @@ function makeCompressionInfo(options: TopologyOptions) {
10581012
return options.compression.compressors;
10591013
}
10601014

1061-
const RETRYABLE_WIRE_VERSION = 6;
1062-
1063-
/** Determines whether the provided topology supports retryable writes */
1064-
function isRetryableWritesSupported(topology: Topology) {
1065-
const maxWireVersion = topology.lastIsMaster().maxWireVersion;
1066-
if (maxWireVersion < RETRYABLE_WIRE_VERSION) {
1067-
return false;
1068-
}
1069-
1070-
if (!topology.logicalSessionTimeoutMinutes) {
1071-
return false;
1072-
}
1073-
1074-
if (topology.description.type === TopologyType.Single) {
1075-
return false;
1076-
}
1077-
1078-
return true;
1079-
}
1080-
10811015
/** @public */
10821016
export class ServerCapabilities {
10831017
maxWireVersion: number;

0 commit comments

Comments
 (0)