Skip to content

Commit 9ca16ec

Browse files
committed
refactor(NODE-5914): refactor Topology.selectServer to async-await
1 parent 64da97f commit 9ca16ec

File tree

3 files changed

+31
-44
lines changed

3 files changed

+31
-44
lines changed

src/change_stream.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -934,14 +934,14 @@ export class ChangeStream<
934934
this.cursor.close().catch(() => null);
935935

936936
const topology = getTopology(this.parent);
937-
topology.selectServer(
938-
this.cursor.readPreference,
939-
{ operationName: 'reconnect topology in change stream' },
940-
serverSelectionError => {
937+
topology
938+
.selectServer(this.cursor.readPreference, {
939+
operationName: 'reconnect topology in change stream'
940+
})
941+
.catch(serverSelectionError => {
941942
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
942943
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
943-
}
944-
);
944+
});
945945
} else {
946946
this._closeEmitterModeWithError(changeStreamError);
947947
}
@@ -966,7 +966,7 @@ export class ChangeStream<
966966
await this.cursor.close().catch(() => null);
967967
const topology = getTopology(this.parent);
968968
try {
969-
await topology.selectServerAsync(this.cursor.readPreference, {
969+
await topology.selectServer(this.cursor.readPreference, {
970970
operationName: 'reconnect topology in change stream'
971971
});
972972
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);

src/operations/execute_operation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ export async function executeOperation<
139139
selector = readPreference;
140140
}
141141

142-
const server = await topology.selectServerAsync(selector, {
142+
const server = await topology.selectServer(selector, {
143143
session,
144144
operationName: operation.commandName
145145
});
@@ -244,7 +244,7 @@ async function retryOperation<
244244
}
245245

246246
// select a new server, and attempt to retry the operation
247-
const server = await topology.selectServerAsync(selector, {
247+
const server = await topology.selectServer(selector, {
248248
session,
249249
operationName: operation.commandName,
250250
previousServer

src/sdam/topology.ts

Lines changed: 22 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
makeStateMachine,
4545
now,
4646
ns,
47+
promiseWithResolvers,
4748
shuffle,
4849
TimeoutController
4950
} from '../utils';
@@ -105,7 +106,8 @@ export interface ServerSelectionRequest {
105106
mongoLogger: MongoLogger | undefined;
106107
transaction?: Transaction;
107108
startTime: number;
108-
callback: ServerSelectionCallback;
109+
resolve: (server: Server) => void;
110+
reject: (reason?: any) => void;
109111
[kCancelled]?: boolean;
110112
timeoutController: TimeoutController;
111113
operationName: string;
@@ -238,11 +240,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
238240
/** @event */
239241
static readonly TIMEOUT = TIMEOUT;
240242

241-
selectServerAsync: (
242-
selector: string | ReadPreference | ServerSelector,
243-
options: SelectServerOptions
244-
) => Promise<Server>;
245-
246243
/**
247244
* @param seedlist - a list of HostAddress instances to connect to
248245
*/
@@ -254,14 +251,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
254251
super();
255252

256253
this.client = client;
257-
this.selectServerAsync = promisify(
258-
(
259-
selector: string | ReadPreference | ServerSelector,
260-
options: SelectServerOptions,
261-
callback: (e: Error, r: Server) => void
262-
) => this.selectServer(selector, options, callback as any)
263-
);
264-
265254
// Options should only be undefined in tests, MongoClient will always have defined options
266255
options = options ?? {
267256
hosts: [HostAddress.fromString('localhost:27017')],
@@ -464,15 +453,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
464453

465454
const readPreference = options.readPreference ?? ReadPreference.primary;
466455
const selectServerOptions = { operationName: 'ping', ...options };
467-
this.selectServer(
468-
readPreferenceServerSelector(readPreference),
469-
selectServerOptions,
470-
(err, server) => {
471-
if (err) {
472-
this.close();
473-
return exitWithError(err);
474-
}
475456

457+
this.selectServer(readPreferenceServerSelector(readPreference), selectServerOptions).then(
458+
server => {
476459
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
477460
if (!skipPingOnConnect && server && this.s.credentials) {
478461
server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => {
@@ -491,6 +474,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
491474
this.emit(Topology.CONNECT, this);
492475

493476
callback?.(undefined, this);
477+
},
478+
error => {
479+
return this.close({ force: false }, () => exitWithError(error));
494480
}
495481
);
496482
}
@@ -533,11 +519,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
533519
* @param callback - The callback used to indicate success or failure
534520
* @returns An instance of a `Server` meeting the criteria of the predicate provided
535521
*/
536-
selectServer(
522+
async selectServer(
537523
selector: string | ReadPreference | ServerSelector,
538-
options: SelectServerOptions,
539-
callback: Callback<Server>
540-
): void {
524+
options: SelectServerOptions
525+
): Promise<Server> {
541526
let serverSelector;
542527
if (typeof selector !== 'function') {
543528
if (typeof selector === 'string') {
@@ -588,16 +573,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
588573
)
589574
);
590575
}
591-
callback(undefined, transaction.server);
592-
return;
576+
return transaction.server;
593577
}
594578

579+
const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>();
595580
const waitQueueMember: ServerSelectionRequest = {
596581
serverSelector,
597582
topologyDescription: this.description,
598583
mongoLogger: this.client.mongoLogger,
599584
transaction,
600-
callback,
585+
resolve,
586+
reject,
601587
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
602588
startTime: now(),
603589
operationName: options.operationName,
@@ -628,13 +614,14 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
628614
)
629615
);
630616
}
631-
waitQueueMember.callback(timeoutError);
617+
waitQueueMember.reject(timeoutError);
632618
});
633619

634620
this[kWaitQueue].push(waitQueueMember);
635621
processWaitQueue(this);
636-
}
637622

623+
return serverPromise;
624+
}
638625
/**
639626
* Update the internal TopologyDescription with a ServerDescription
640627
*
@@ -911,7 +898,7 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr
911898
);
912899
}
913900
}
914-
waitQueueMember.callback(err);
901+
waitQueueMember.reject(err);
915902
}
916903
}
917904
}
@@ -964,7 +951,7 @@ function processWaitQueue(topology: Topology) {
964951
)
965952
);
966953
}
967-
waitQueueMember.callback(e);
954+
waitQueueMember.reject(e);
968955
continue;
969956
}
970957

@@ -1027,7 +1014,7 @@ function processWaitQueue(topology: Topology) {
10271014
)
10281015
);
10291016
}
1030-
waitQueueMember.callback(error);
1017+
waitQueueMember.reject(error);
10311018
return;
10321019
}
10331020
const transaction = waitQueueMember.transaction;
@@ -1053,7 +1040,7 @@ function processWaitQueue(topology: Topology) {
10531040
)
10541041
);
10551042
}
1056-
waitQueueMember.callback(undefined, selectedServer);
1043+
waitQueueMember.resolve(selectedServer);
10571044
}
10581045

10591046
if (topology[kWaitQueue].length > 0) {

0 commit comments

Comments
 (0)