Skip to content

Commit 1b2af7d

Browse files
committed
refactor(NODE-5914): refactor Topology.selectServer to async-await
1 parent 443835e commit 1b2af7d

File tree

3 files changed

+31
-43
lines changed

3 files changed

+31
-43
lines changed

src/change_stream.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -934,14 +934,14 @@ export class ChangeStream<
934934
this.cursor.close().catch(() => null);
935935

936936
const topology = getTopology(this.parent);
937-
topology.selectServer(
938-
this.cursor.readPreference,
939-
{ operationName: 'reconnect topology in change stream' },
940-
serverSelectionError => {
937+
topology
938+
.selectServer(this.cursor.readPreference, {
939+
operationName: 'reconnect topology in change stream'
940+
})
941+
.catch(serverSelectionError => {
941942
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
942943
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
943-
}
944-
);
944+
});
945945
} else {
946946
this._closeEmitterModeWithError(changeStreamError);
947947
}
@@ -966,7 +966,7 @@ export class ChangeStream<
966966
await this.cursor.close().catch(() => null);
967967
const topology = getTopology(this.parent);
968968
try {
969-
await topology.selectServerAsync(this.cursor.readPreference, {
969+
await topology.selectServer(this.cursor.readPreference, {
970970
operationName: 'reconnect topology in change stream'
971971
});
972972
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);

src/operations/execute_operation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export async function executeOperation<
139139
selector = readPreference;
140140
}
141141

142-
const server = await topology.selectServerAsync(selector, {
142+
const server = await topology.selectServer(selector, {
143143
session,
144144
operationName: operation.commandName
145145
});
@@ -244,7 +244,7 @@ async function retryOperation<
244244
}
245245

246246
// select a new server, and attempt to retry the operation
247-
const server = await topology.selectServerAsync(selector, {
247+
const server = await topology.selectServer(selector, {
248248
session,
249249
operationName: operation.commandName,
250250
previousServer

src/sdam/topology.ts

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
makeStateMachine,
4545
now,
4646
ns,
47+
promiseWithResolvers,
4748
shuffle,
4849
TimeoutController
4950
} from '../utils';
@@ -105,7 +106,8 @@ export interface ServerSelectionRequest {
105106
mongoLogger: MongoLogger | undefined;
106107
transaction?: Transaction;
107108
startTime: number;
108-
callback: ServerSelectionCallback;
109+
resolve: (server: Server) => void;
110+
reject: (reason?: any) => void;
109111
[kCancelled]?: boolean;
110112
timeoutController: TimeoutController;
111113
operationName: string;
@@ -237,11 +239,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
237239
/** @event */
238240
static readonly TIMEOUT = TIMEOUT;
239241

240-
selectServerAsync: (
241-
selector: string | ReadPreference | ServerSelector,
242-
options: SelectServerOptions
243-
) => Promise<Server>;
244-
245242
/**
246243
* @param seedlist - a list of HostAddress instances to connect to
247244
*/
@@ -253,14 +250,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
253250
super();
254251

255252
this.client = client;
256-
this.selectServerAsync = promisify(
257-
(
258-
selector: string | ReadPreference | ServerSelector,
259-
options: SelectServerOptions,
260-
callback: (e: Error, r: Server) => void
261-
) => this.selectServer(selector, options, callback as any)
262-
);
263-
264253
// Options should only be undefined in tests, MongoClient will always have defined options
265254
options = options ?? {
266255
hosts: [HostAddress.fromString('localhost:27017')],
@@ -463,14 +452,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
463452

464453
const readPreference = options.readPreference ?? ReadPreference.primary;
465454
const selectServerOptions = { operationName: 'ping', ...options };
466-
this.selectServer(
467-
readPreferenceServerSelector(readPreference),
468-
selectServerOptions,
469-
(err, server) => {
470-
if (err) {
471-
return this.close({ force: false }, () => exitWithError(err));
472-
}
473455

456+
this.selectServer(readPreferenceServerSelector(readPreference), selectServerOptions).then(
457+
server => {
474458
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
475459
if (!skipPingOnConnect && server && this.s.credentials) {
476460
server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => {
@@ -489,6 +473,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
489473
this.emit(Topology.CONNECT, this);
490474

491475
callback?.(undefined, this);
476+
},
477+
error => {
478+
return this.close({ force: false }, () => exitWithError(error));
492479
}
493480
);
494481
}
@@ -539,11 +526,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
539526
* @param callback - The callback used to indicate success or failure
540527
* @returns An instance of a `Server` meeting the criteria of the predicate provided
541528
*/
542-
selectServer(
529+
async selectServer(
543530
selector: string | ReadPreference | ServerSelector,
544-
options: SelectServerOptions,
545-
callback: Callback<Server>
546-
): void {
531+
options: SelectServerOptions
532+
): Promise<Server> {
547533
let serverSelector;
548534
if (typeof selector !== 'function') {
549535
if (typeof selector === 'string') {
@@ -594,16 +580,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
594580
)
595581
);
596582
}
597-
callback(undefined, transaction.server);
598-
return;
583+
return transaction.server;
599584
}
600585

586+
const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
601587
const waitQueueMember: ServerSelectionRequest = {
602588
serverSelector,
603589
topologyDescription: this.description,
604590
mongoLogger: this.client.mongoLogger,
605591
transaction,
606-
callback,
592+
resolve,
593+
reject,
607594
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
608595
startTime: now(),
609596
operationName: options.operationName,
@@ -634,13 +621,14 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
634621
)
635622
);
636623
}
637-
waitQueueMember.callback(timeoutError);
624+
waitQueueMember.reject(timeoutError);
638625
});
639626

640627
this[kWaitQueue].push(waitQueueMember);
641628
processWaitQueue(this);
642-
}
643629

630+
return serverPromise;
631+
}
644632
/**
645633
* Update the internal TopologyDescription with a ServerDescription
646634
*
@@ -927,7 +915,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr
927915
);
928916
}
929917
}
930-
waitQueueMember.callback(err);
918+
waitQueueMember.reject(err);
931919
}
932920
}
933921
}
@@ -980,7 +968,7 @@ function processWaitQueue(topology: Topology) {
980968
)
981969
);
982970
}
983-
waitQueueMember.callback(e);
971+
waitQueueMember.reject(e);
984972
continue;
985973
}
986974

@@ -1043,7 +1031,7 @@ function processWaitQueue(topology: Topology) {
10431031
)
10441032
);
10451033
}
1046-
waitQueueMember.callback(error);
1034+
waitQueueMember.reject(error);
10471035
return;
10481036
}
10491037
const transaction = waitQueueMember.transaction;
@@ -1069,7 +1057,7 @@ function processWaitQueue(topology: Topology) {
10691057
)
10701058
);
10711059
}
1072-
waitQueueMember.callback(undefined, selectedServer);
1060+
waitQueueMember.resolve(selectedServer);
10731061
}
10741062

10751063
if (topology[kWaitQueue].length > 0) {

0 commit comments

Comments
 (0)