Skip to content

Commit aec8416

Browse files
authored
refactor(NODE-5914): topology.selectServer to async-await (#4020)
1 parent d86d2ae commit aec8416

15 files changed

+441
-541
lines changed

src/change_stream.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -934,14 +934,16 @@ export class ChangeStream<
934934
this.cursor.close().catch(() => null);
935935

936936
const topology = getTopology(this.parent);
937-
topology.selectServer(
938-
this.cursor.readPreference,
939-
{ operationName: 'reconnect topology in change stream' },
940-
serverSelectionError => {
941-
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
942-
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
943-
}
944-
);
937+
topology
938+
.selectServer(this.cursor.readPreference, {
939+
operationName: 'reconnect topology in change stream'
940+
})
941+
.then(
942+
() => {
943+
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
944+
},
945+
() => this._closeEmitterModeWithError(changeStreamError)
946+
);
945947
} else {
946948
this._closeEmitterModeWithError(changeStreamError);
947949
}
@@ -966,7 +968,7 @@ export class ChangeStream<
966968
await this.cursor.close().catch(() => null);
967969
const topology = getTopology(this.parent);
968970
try {
969-
await topology.selectServerAsync(this.cursor.readPreference, {
971+
await topology.selectServer(this.cursor.readPreference, {
970972
operationName: 'reconnect topology in change stream'
971973
});
972974
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);

src/mongo_client.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { promises as fs } from 'fs';
22
import type { TcpNetConnectOpts } from 'net';
33
import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls';
4-
import { promisify } from 'util';
54

65
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson';
76
import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream';
@@ -550,7 +549,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
550549

551550
const topologyConnect = async () => {
552551
try {
553-
await promisify(callback => this.topology?.connect(options, callback))();
552+
await this.topology?.connect(options);
554553
} catch (error) {
555554
this.topology?.close();
556555
throw error;

src/operations/execute_operation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export async function executeOperation<
139139
selector = readPreference;
140140
}
141141

142-
const server = await topology.selectServerAsync(selector, {
142+
const server = await topology.selectServer(selector, {
143143
session,
144144
operationName: operation.commandName
145145
});
@@ -244,7 +244,7 @@ async function retryOperation<
244244
}
245245

246246
// select a new server, and attempt to retry the operation
247-
const server = await topology.selectServerAsync(selector, {
247+
const server = await topology.selectServer(selector, {
248248
session,
249249
operationName: operation.commandName,
250250
previousServer

src/sdam/topology.ts

Lines changed: 73 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import { promisify } from 'util';
2-
31
import type { BSONSerializeOptions, Document } from '../bson';
42
import type { MongoCredentials } from '../cmap/auth/mongo_credentials';
53
import type { ConnectionEvents } from '../cmap/connection';
@@ -44,6 +42,7 @@ import {
4442
makeStateMachine,
4543
now,
4644
ns,
45+
promiseWithResolvers,
4746
shuffle,
4847
TimeoutController
4948
} from '../utils';
@@ -105,7 +104,8 @@ export interface ServerSelectionRequest {
105104
mongoLogger: MongoLogger | undefined;
106105
transaction?: Transaction;
107106
startTime: number;
108-
callback: ServerSelectionCallback;
107+
resolve: (server: Server) => void;
108+
reject: (error: MongoError) => void;
109109
[kCancelled]?: boolean;
110110
timeoutController: TimeoutController;
111111
operationName: string;
@@ -215,6 +215,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
215215

216216
client!: MongoClient;
217217

218+
/** @internal */
219+
private connectionLock?: Promise<Topology>;
220+
218221
/** @event */
219222
static readonly SERVER_OPENING = SERVER_OPENING;
220223
/** @event */
@@ -238,11 +241,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
238241
/** @event */
239242
static readonly TIMEOUT = TIMEOUT;
240243

241-
selectServerAsync: (
242-
selector: string | ReadPreference | ServerSelector,
243-
options: SelectServerOptions
244-
) => Promise<Server>;
245-
246244
/**
247245
* @param seedlist - a list of HostAddress instances to connect to
248246
*/
@@ -254,14 +252,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
254252
super();
255253

256254
this.client = client;
257-
this.selectServerAsync = promisify(
258-
(
259-
selector: string | ReadPreference | ServerSelector,
260-
options: SelectServerOptions,
261-
callback: (e: Error, r: Server) => void
262-
) => this.selectServer(selector, options, callback as any)
263-
);
264-
265255
// Options should only be undefined in tests, MongoClient will always have defined options
266256
options = options ?? {
267257
hosts: [HostAddress.fromString('localhost:27017')],
@@ -351,6 +341,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
351341

352342
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
353343
}
344+
this.connectionLock = undefined;
354345
}
355346

356347
private detectShardedTopology(event: TopologyDescriptionChangedEvent) {
@@ -411,17 +402,22 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
411402
}
412403

413404
/** Initiate server connect */
414-
connect(callback: Callback): void;
415-
connect(options: ConnectOptions, callback: Callback): void;
416-
connect(options?: ConnectOptions | Callback, callback?: Callback): void {
417-
if (typeof options === 'function') (callback = options), (options = {});
405+
async connect(options?: ConnectOptions): Promise<Topology> {
406+
this.connectionLock ??= this._connect(options);
407+
try {
408+
await this.connectionLock;
409+
return this;
410+
} finally {
411+
this.connectionLock = undefined;
412+
}
413+
414+
return this;
415+
}
416+
417+
private async _connect(options?: ConnectOptions): Promise<Topology> {
418418
options = options ?? {};
419419
if (this.s.state === STATE_CONNECTED) {
420-
if (typeof callback === 'function') {
421-
callback();
422-
}
423-
424-
return;
420+
return this;
425421
}
426422

427423
stateTransition(this, STATE_CONNECTING);
@@ -459,40 +455,33 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
459455
}
460456
}
461457

462-
const exitWithError = (error: Error) =>
463-
callback ? callback(error) : this.emit(Topology.ERROR, error);
464-
465458
const readPreference = options.readPreference ?? ReadPreference.primary;
466459
const selectServerOptions = { operationName: 'ping', ...options };
467-
this.selectServer(
468-
readPreferenceServerSelector(readPreference),
469-
selectServerOptions,
470-
(err, server) => {
471-
if (err) {
472-
this.close();
473-
return exitWithError(err);
474-
}
475-
476-
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
477-
if (!skipPingOnConnect && server && this.s.credentials) {
478-
server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => {
479-
stateTransition(this, STATE_CONNECTED);
480-
this.emit(Topology.OPEN, this);
481-
this.emit(Topology.CONNECT, this);
482-
483-
callback?.(undefined, this);
484-
}, exitWithError);
485-
486-
return;
487-
}
460+
try {
461+
const server = await this.selectServer(
462+
readPreferenceServerSelector(readPreference),
463+
selectServerOptions
464+
);
488465

466+
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
467+
if (!skipPingOnConnect && server && this.s.credentials) {
468+
await server.command(ns('admin.$cmd'), { ping: 1 }, {});
489469
stateTransition(this, STATE_CONNECTED);
490470
this.emit(Topology.OPEN, this);
491471
this.emit(Topology.CONNECT, this);
492472

493-
callback?.(undefined, this);
473+
return this;
494474
}
495-
);
475+
476+
stateTransition(this, STATE_CONNECTED);
477+
this.emit(Topology.OPEN, this);
478+
this.emit(Topology.CONNECT, this);
479+
480+
return this;
481+
} catch (error) {
482+
this.close();
483+
throw error;
484+
}
496485
}
497486

498487
/** Close this topology */
@@ -533,11 +522,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
533522
* @param callback - The callback used to indicate success or failure
534523
* @returns An instance of a `Server` meeting the criteria of the predicate provided
535524
*/
536-
selectServer(
525+
async selectServer(
537526
selector: string | ReadPreference | ServerSelector,
538-
options: SelectServerOptions,
539-
callback: Callback<Server>
540-
): void {
527+
options: SelectServerOptions
528+
): Promise<Server> {
541529
let serverSelector;
542530
if (typeof selector !== 'function') {
543531
if (typeof selector === 'string') {
@@ -588,16 +576,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
588576
)
589577
);
590578
}
591-
callback(undefined, transaction.server);
592-
return;
579+
return transaction.server;
593580
}
594581

582+
const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
595583
const waitQueueMember: ServerSelectionRequest = {
596584
serverSelector,
597585
topologyDescription: this.description,
598586
mongoLogger: this.client.mongoLogger,
599587
transaction,
600-
callback,
588+
resolve,
589+
reject,
601590
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
602591
startTime: now(),
603592
operationName: options.operationName,
@@ -628,13 +617,14 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
628617
)
629618
);
630619
}
631-
waitQueueMember.callback(timeoutError);
620+
waitQueueMember.reject(timeoutError);
632621
});
633622

634623
this[kWaitQueue].push(waitQueueMember);
635624
processWaitQueue(this);
636-
}
637625

626+
return serverPromise;
627+
}
638628
/**
639629
* Update the internal TopologyDescription with a ServerDescription
640630
*
@@ -883,7 +873,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
883873
}
884874
}
885875

886-
function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverError) {
876+
function drainWaitQueue(queue: List<ServerSelectionRequest>, drainError: MongoDriverError) {
887877
while (queue.length) {
888878
const waitQueueMember = queue.shift();
889879
if (!waitQueueMember) {
@@ -893,25 +883,23 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr
893883
waitQueueMember.timeoutController.clear();
894884

895885
if (!waitQueueMember[kCancelled]) {
896-
if (err) {
897-
if (
898-
waitQueueMember.mongoLogger?.willLog(
899-
MongoLoggableComponent.SERVER_SELECTION,
900-
SeverityLevel.DEBUG
886+
if (
887+
waitQueueMember.mongoLogger?.willLog(
888+
MongoLoggableComponent.SERVER_SELECTION,
889+
SeverityLevel.DEBUG
890+
)
891+
) {
892+
waitQueueMember.mongoLogger?.debug(
893+
MongoLoggableComponent.SERVER_SELECTION,
894+
new ServerSelectionFailedEvent(
895+
waitQueueMember.serverSelector,
896+
waitQueueMember.topologyDescription,
897+
drainError,
898+
waitQueueMember.operationName
901899
)
902-
) {
903-
waitQueueMember.mongoLogger?.debug(
904-
MongoLoggableComponent.SERVER_SELECTION,
905-
new ServerSelectionFailedEvent(
906-
waitQueueMember.serverSelector,
907-
waitQueueMember.topologyDescription,
908-
err,
909-
waitQueueMember.operationName
910-
)
911-
);
912-
}
900+
);
913901
}
914-
waitQueueMember.callback(err);
902+
waitQueueMember.reject(drainError);
915903
}
916904
}
917905
}
@@ -946,7 +934,7 @@ function processWaitQueue(topology: Topology) {
946934
previousServer ? [previousServer] : []
947935
)
948936
: serverDescriptions;
949-
} catch (e) {
937+
} catch (selectorError) {
950938
waitQueueMember.timeoutController.clear();
951939
if (
952940
topology.client.mongoLogger?.willLog(
@@ -959,12 +947,12 @@ function processWaitQueue(topology: Topology) {
959947
new ServerSelectionFailedEvent(
960948
waitQueueMember.serverSelector,
961949
topology.description,
962-
e,
950+
selectorError,
963951
waitQueueMember.operationName
964952
)
965953
);
966954
}
967-
waitQueueMember.callback(e);
955+
waitQueueMember.reject(selectorError);
968956
continue;
969957
}
970958

@@ -1007,7 +995,7 @@ function processWaitQueue(topology: Topology) {
1007995
}
1008996

1009997
if (!selectedServer) {
1010-
const error = new MongoServerSelectionError(
998+
const serverSelectionError = new MongoServerSelectionError(
1011999
'server selection returned a server description but the server was not found in the topology',
10121000
topology.description
10131001
);
@@ -1022,12 +1010,12 @@ function processWaitQueue(topology: Topology) {
10221010
new ServerSelectionFailedEvent(
10231011
waitQueueMember.serverSelector,
10241012
topology.description,
1025-
error,
1013+
serverSelectionError,
10261014
waitQueueMember.operationName
10271015
)
10281016
);
10291017
}
1030-
waitQueueMember.callback(error);
1018+
waitQueueMember.reject(serverSelectionError);
10311019
return;
10321020
}
10331021
const transaction = waitQueueMember.transaction;
@@ -1053,7 +1041,7 @@ function processWaitQueue(topology: Topology) {
10531041
)
10541042
);
10551043
}
1056-
waitQueueMember.callback(undefined, selectedServer);
1044+
waitQueueMember.resolve(selectedServer);
10571045
}
10581046

10591047
if (topology[kWaitQueue].length > 0) {

0 commit comments

Comments
 (0)