-
Notifications
You must be signed in to change notification settings - Fork 1.8k
refactor(NODE-5914): topology.selectServer to async-await #4020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
44 commits
Select commit
Hold shift + click to select a range
77d96e4
eslint and build fixes
W-A-James f5bbb43
remove force flag
W-A-James 6c2b3c8
remove import
W-A-James a0da9ca
fix????
W-A-James d0a1d3f
remove DestroyOptions
W-A-James ddd9f2e
refactor(NODE-5914): refactor Topology.selectServer to async-await
W-A-James 5883888
fix tests
W-A-James 1026ae9
fix types
W-A-James 0b44449
eslint
W-A-James 1b8a19b
fix return
W-A-James 8d87204
fix
W-A-James bfdaf52
fix test
W-A-James 91275ef
remove import
W-A-James af54e79
deprecate CloseOptions
W-A-James 3f6ab2f
add correct then handler
W-A-James e033bd5
fix unit test
W-A-James 6892362
remove unintended deprecation
W-A-James 45c8909
Merge branch 'main' into NODE-5914/refactor-selectServer
W-A-James cf07a5d
convert Topology.connect to async/await
W-A-James 94539c5
fix unit tests
W-A-James 4e447bf
Merge branch 'NODE-5914/refactor-selectServer' of github.com:mongodb/…
W-A-James 961cc0f
Merge branch 'main' into NODE-5914/refactor-selectServer
W-A-James 331d124
remove unneeded script
W-A-James 5e4182b
fix uri test
W-A-James 3d2e854
lint fix
W-A-James 988f549
lint fix
W-A-James df1d677
fix test
W-A-James e485ec5
add connectionLock
W-A-James 09814fb
fix find test
W-A-James 99ec556
Apply suggestions from code review
W-A-James 83deb48
test refactor fixes
W-A-James 04d7880
narrow types
W-A-James bc46586
remove unused branch
W-A-James c1d3265
renames
W-A-James a7202ec
review suggestions
W-A-James d2f480c
fix failing unit test
W-A-James 3fcfe31
fix test failure
W-A-James 2bf383f
fix async tests
W-A-James 228e622
test fixes
W-A-James f13977e
remove expect.fail
W-A-James d204529
Update src/sdam/topology.ts
W-A-James b2c9d37
fix comment
W-A-James 9d73840
lint
W-A-James df80941
Merge branch 'main' into NODE-5914/refactor-selectServer
baileympearson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
import { promisify } from 'util'; | ||
|
||
import type { BSONSerializeOptions, Document } from '../bson'; | ||
import type { MongoCredentials } from '../cmap/auth/mongo_credentials'; | ||
import type { ConnectionEvents } from '../cmap/connection'; | ||
|
@@ -44,6 +42,7 @@ import { | |
makeStateMachine, | ||
now, | ||
ns, | ||
promiseWithResolvers, | ||
shuffle, | ||
TimeoutController | ||
} from '../utils'; | ||
|
@@ -105,7 +104,8 @@ export interface ServerSelectionRequest { | |
mongoLogger: MongoLogger | undefined; | ||
transaction?: Transaction; | ||
startTime: number; | ||
callback: ServerSelectionCallback; | ||
resolve: (server: Server) => void; | ||
reject: (error: MongoError) => void; | ||
[kCancelled]?: boolean; | ||
timeoutController: TimeoutController; | ||
operationName: string; | ||
|
@@ -215,6 +215,9 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
|
||
client!: MongoClient; | ||
|
||
/** @internal */ | ||
private connectionLock?: Promise<Topology>; | ||
|
||
/** @event */ | ||
static readonly SERVER_OPENING = SERVER_OPENING; | ||
/** @event */ | ||
|
@@ -238,11 +241,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
/** @event */ | ||
static readonly TIMEOUT = TIMEOUT; | ||
|
||
selectServerAsync: ( | ||
selector: string | ReadPreference | ServerSelector, | ||
options: SelectServerOptions | ||
) => Promise<Server>; | ||
|
||
/** | ||
* @param seedlist - a list of HostAddress instances to connect to | ||
*/ | ||
|
@@ -254,14 +252,6 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
super(); | ||
|
||
this.client = client; | ||
this.selectServerAsync = promisify( | ||
( | ||
selector: string | ReadPreference | ServerSelector, | ||
options: SelectServerOptions, | ||
callback: (e: Error, r: Server) => void | ||
) => this.selectServer(selector, options, callback as any) | ||
); | ||
|
||
// Options should only be undefined in tests, MongoClient will always have defined options | ||
options = options ?? { | ||
hosts: [HostAddress.fromString('localhost:27017')], | ||
|
@@ -351,6 +341,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
|
||
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology); | ||
} | ||
this.connectionLock = undefined; | ||
} | ||
|
||
private detectShardedTopology(event: TopologyDescriptionChangedEvent) { | ||
|
@@ -411,17 +402,22 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
} | ||
|
||
/** Initiate server connect */ | ||
connect(callback: Callback): void; | ||
connect(options: ConnectOptions, callback: Callback): void; | ||
connect(options?: ConnectOptions | Callback, callback?: Callback): void { | ||
if (typeof options === 'function') (callback = options), (options = {}); | ||
async connect(options?: ConnectOptions): Promise<Topology> { | ||
this.connectionLock ??= this._connect(options); | ||
try { | ||
await this.connectionLock; | ||
return this; | ||
} finally { | ||
this.connectionLock = undefined; | ||
} | ||
|
||
return this; | ||
} | ||
|
||
private async _connect(options?: ConnectOptions): Promise<Topology> { | ||
options = options ?? {}; | ||
if (this.s.state === STATE_CONNECTED) { | ||
if (typeof callback === 'function') { | ||
callback(); | ||
} | ||
|
||
return; | ||
return this; | ||
} | ||
|
||
stateTransition(this, STATE_CONNECTING); | ||
|
@@ -459,40 +455,33 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
} | ||
} | ||
|
||
const exitWithError = (error: Error) => | ||
callback ? callback(error) : this.emit(Topology.ERROR, error); | ||
|
||
const readPreference = options.readPreference ?? ReadPreference.primary; | ||
const selectServerOptions = { operationName: 'ping', ...options }; | ||
this.selectServer( | ||
readPreferenceServerSelector(readPreference), | ||
selectServerOptions, | ||
(err, server) => { | ||
if (err) { | ||
this.close(); | ||
return exitWithError(err); | ||
} | ||
|
||
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; | ||
if (!skipPingOnConnect && server && this.s.credentials) { | ||
server.command(ns('admin.$cmd'), { ping: 1 }, {}).then(() => { | ||
stateTransition(this, STATE_CONNECTED); | ||
this.emit(Topology.OPEN, this); | ||
this.emit(Topology.CONNECT, this); | ||
|
||
callback?.(undefined, this); | ||
}, exitWithError); | ||
|
||
return; | ||
} | ||
try { | ||
const server = await this.selectServer( | ||
readPreferenceServerSelector(readPreference), | ||
selectServerOptions | ||
); | ||
|
||
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true; | ||
if (!skipPingOnConnect && server && this.s.credentials) { | ||
await server.command(ns('admin.$cmd'), { ping: 1 }, {}); | ||
stateTransition(this, STATE_CONNECTED); | ||
this.emit(Topology.OPEN, this); | ||
this.emit(Topology.CONNECT, this); | ||
|
||
callback?.(undefined, this); | ||
return this; | ||
} | ||
); | ||
|
||
stateTransition(this, STATE_CONNECTED); | ||
this.emit(Topology.OPEN, this); | ||
this.emit(Topology.CONNECT, this); | ||
|
||
return this; | ||
} catch (error) { | ||
this.close(); | ||
throw error; | ||
} | ||
} | ||
|
||
/** Close this topology */ | ||
|
@@ -533,11 +522,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
* @param callback - The callback used to indicate success or failure | ||
* @returns An instance of a `Server` meeting the criteria of the predicate provided | ||
*/ | ||
selectServer( | ||
async selectServer( | ||
selector: string | ReadPreference | ServerSelector, | ||
options: SelectServerOptions, | ||
callback: Callback<Server> | ||
): void { | ||
options: SelectServerOptions | ||
): Promise<Server> { | ||
let serverSelector; | ||
if (typeof selector !== 'function') { | ||
if (typeof selector === 'string') { | ||
|
@@ -588,16 +576,17 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
) | ||
); | ||
} | ||
callback(undefined, transaction.server); | ||
return; | ||
return transaction.server; | ||
} | ||
|
||
const { promise: serverPromise, resolve, reject } = promiseWithResolvers<Server>(); | ||
const waitQueueMember: ServerSelectionRequest = { | ||
serverSelector, | ||
topologyDescription: this.description, | ||
mongoLogger: this.client.mongoLogger, | ||
transaction, | ||
callback, | ||
resolve, | ||
reject, | ||
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS), | ||
startTime: now(), | ||
operationName: options.operationName, | ||
|
@@ -628,13 +617,14 @@ export class Topology extends TypedEventEmitter<TopologyEvents> { | |
) | ||
); | ||
} | ||
waitQueueMember.callback(timeoutError); | ||
waitQueueMember.reject(timeoutError); | ||
}); | ||
|
||
this[kWaitQueue].push(waitQueueMember); | ||
processWaitQueue(this); | ||
} | ||
|
||
return serverPromise; | ||
} | ||
/** | ||
* Update the internal TopologyDescription with a ServerDescription | ||
* | ||
|
@@ -883,7 +873,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes | |
} | ||
} | ||
|
||
function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverError) { | ||
function drainWaitQueue(queue: List<ServerSelectionRequest>, drainError: MongoDriverError) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔥 I always appreciate drive-by improvement of variable names, thanks for improving this one! lack of attention to variable names leads to death of readability by a thousand cuts 🔪 |
||
while (queue.length) { | ||
const waitQueueMember = queue.shift(); | ||
if (!waitQueueMember) { | ||
|
@@ -893,25 +883,23 @@ function drainWaitQueue(queue: List<ServerSelectionRequest>, err?: MongoDriverEr | |
waitQueueMember.timeoutController.clear(); | ||
|
||
if (!waitQueueMember[kCancelled]) { | ||
if (err) { | ||
if ( | ||
waitQueueMember.mongoLogger?.willLog( | ||
MongoLoggableComponent.SERVER_SELECTION, | ||
SeverityLevel.DEBUG | ||
if ( | ||
waitQueueMember.mongoLogger?.willLog( | ||
MongoLoggableComponent.SERVER_SELECTION, | ||
SeverityLevel.DEBUG | ||
) | ||
) { | ||
waitQueueMember.mongoLogger?.debug( | ||
MongoLoggableComponent.SERVER_SELECTION, | ||
new ServerSelectionFailedEvent( | ||
waitQueueMember.serverSelector, | ||
waitQueueMember.topologyDescription, | ||
drainError, | ||
waitQueueMember.operationName | ||
) | ||
) { | ||
waitQueueMember.mongoLogger?.debug( | ||
MongoLoggableComponent.SERVER_SELECTION, | ||
new ServerSelectionFailedEvent( | ||
waitQueueMember.serverSelector, | ||
waitQueueMember.topologyDescription, | ||
err, | ||
waitQueueMember.operationName | ||
) | ||
); | ||
} | ||
); | ||
} | ||
waitQueueMember.callback(err); | ||
waitQueueMember.reject(drainError); | ||
} | ||
} | ||
} | ||
|
@@ -946,7 +934,7 @@ function processWaitQueue(topology: Topology) { | |
previousServer ? [previousServer] : [] | ||
) | ||
: serverDescriptions; | ||
} catch (e) { | ||
} catch (selectorError) { | ||
waitQueueMember.timeoutController.clear(); | ||
if ( | ||
topology.client.mongoLogger?.willLog( | ||
|
@@ -959,12 +947,12 @@ function processWaitQueue(topology: Topology) { | |
new ServerSelectionFailedEvent( | ||
waitQueueMember.serverSelector, | ||
topology.description, | ||
e, | ||
selectorError, | ||
waitQueueMember.operationName | ||
) | ||
); | ||
} | ||
waitQueueMember.callback(e); | ||
waitQueueMember.reject(selectorError); | ||
continue; | ||
} | ||
|
||
|
@@ -1007,7 +995,7 @@ function processWaitQueue(topology: Topology) { | |
} | ||
|
||
if (!selectedServer) { | ||
const error = new MongoServerSelectionError( | ||
const serverSelectionError = new MongoServerSelectionError( | ||
'server selection returned a server description but the server was not found in the topology', | ||
topology.description | ||
); | ||
|
@@ -1022,12 +1010,12 @@ function processWaitQueue(topology: Topology) { | |
new ServerSelectionFailedEvent( | ||
waitQueueMember.serverSelector, | ||
topology.description, | ||
error, | ||
serverSelectionError, | ||
waitQueueMember.operationName | ||
) | ||
); | ||
} | ||
waitQueueMember.callback(error); | ||
waitQueueMember.reject(serverSelectionError); | ||
return; | ||
} | ||
const transaction = waitQueueMember.transaction; | ||
|
@@ -1053,7 +1041,7 @@ function processWaitQueue(topology: Topology) { | |
) | ||
); | ||
} | ||
waitQueueMember.callback(undefined, selectedServer); | ||
waitQueueMember.resolve(selectedServer); | ||
} | ||
|
||
if (topology[kWaitQueue].length > 0) { | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.