Skip to content

Commit c50024f

Browse files
committed
chore: move topology init into function, always construct topology
1 parent 6b152ad commit c50024f

File tree

3 files changed

+132
-181
lines changed

3 files changed

+132
-181
lines changed

src/mongo_client.ts

Lines changed: 16 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,21 @@
1-
import { once } from 'events';
2-
import { promises as fs } from 'fs';
31
import type { TcpNetConnectOpts } from 'net';
42
import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls';
53

64
import { type BSONSerializeOptions, type Document, resolveBSONOptions } from './bson';
75
import { ChangeStream, type ChangeStreamDocument, type ChangeStreamOptions } from './change_stream';
86
import type { AutoEncrypter, AutoEncryptionOptions } from './client-side-encryption/auto_encrypter';
9-
import {
10-
type AuthMechanismProperties,
11-
DEFAULT_ALLOWED_HOSTS,
12-
type MongoCredentials
13-
} from './cmap/auth/mongo_credentials';
7+
import { type AuthMechanismProperties, type MongoCredentials } from './cmap/auth/mongo_credentials';
148
import { type TokenCache } from './cmap/auth/mongodb_oidc/token_cache';
15-
import { AuthMechanism } from './cmap/auth/providers';
9+
import { type AuthMechanism } from './cmap/auth/providers';
1610
import type { LEGAL_TCP_SOCKET_OPTIONS, LEGAL_TLS_SOCKET_OPTIONS } from './cmap/connect';
1711
import type { Connection } from './cmap/connection';
1812
import type { ClientMetadata } from './cmap/handshake/client_metadata';
1913
import type { CompressorName } from './cmap/wire_protocol/compression';
20-
import { parseOptions, resolveSRVRecord } from './connection_string';
21-
import { MONGO_CLIENT_EVENTS } from './constants';
14+
import { parseOptions } from './connection_string';
15+
import { type MONGO_CLIENT_EVENTS } from './constants';
2216
import { Db, type DbOptions } from './db';
2317
import type { Encrypter } from './encrypter';
24-
import { MongoInvalidArgumentError, MongoNetworkTimeoutError } from './error';
18+
import { MongoInvalidArgumentError } from './error';
2519
import { MongoClientAuthProviders } from './mongo_client_auth_providers';
2620
import {
2721
type LogComponentSeveritiesClientOptions,
@@ -35,22 +29,18 @@ import { executeOperation } from './operations/execute_operation';
3529
import { RunAdminCommandOperation } from './operations/run_command';
3630
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
3731
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
38-
import { STATE_CONNECTED, STATE_CONNECTING } from './sdam/common';
3932
import type { ServerMonitoringMode } from './sdam/monitor';
40-
import { Server } from './sdam/server';
4133
import type { TagSet } from './sdam/server_description';
4234
import { readPreferenceServerSelector } from './sdam/server_selection';
4335
import type { SrvPoller } from './sdam/srv_polling';
4436
import { Topology, type TopologyEvents } from './sdam/topology';
4537
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
46-
import { Timeout } from './timeout';
4738
import {
4839
COSMOS_DB_CHECK,
4940
COSMOS_DB_MSG,
5041
DOCUMENT_DB_CHECK,
5142
DOCUMENT_DB_MSG,
5243
type HostAddress,
53-
hostMatchesWildcards,
5444
isHostMatch,
5545
type MongoDBNamespace,
5646
ns,
@@ -352,7 +342,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
352342
/** @internal */
353343
s: MongoClientPrivate;
354344
/** @internal */
355-
topology?: Topology;
345+
topology: Topology;
356346
/** @internal */
357347
override readonly mongoLogger: MongoLogger | undefined;
358348
/** @internal */
@@ -406,6 +396,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
406396
}
407397
};
408398
this.checkForNonGenuineHosts();
399+
400+
this.topology = new Topology(this, this[kOptions]);
409401
}
410402

411403
/** @internal */
@@ -477,16 +469,13 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
477469
}
478470

479471
/** @internal */
480-
async _connectWithLock(options?: {
481-
skipPing: boolean;
482-
readPreference?: ReadPreference;
483-
}): Promise<this> {
472+
async _connectWithLock(): Promise<this> {
484473
if (this.connectionLock) {
485474
return await this.connectionLock;
486475
}
487476

488477
try {
489-
this.connectionLock = this._connect(options);
478+
this.connectionLock = this._connect();
490479
await this.connectionLock;
491480
} finally {
492481
// release
@@ -502,21 +491,17 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
502491
*
503492
* @internal
504493
*/
505-
private async _connect(options?: {
506-
skipPing: boolean;
507-
readPreference?: ReadPreference;
508-
}): Promise<this> {
509-
const topology = await this.initTopology();
510-
const readPreference = options?.readPreference ?? this.readPreference;
494+
private async _connect(): Promise<this> {
511495
const { encrypter } = this[kOptions];
512496
const topologyConnect = async () => {
513497
try {
514498
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
515-
if (!skipPingOnConnect && !options?.skipPing && topology.s.credentials != null) {
516-
await this.db().admin().ping({ readPreference }); // goes through `executeOperation` so performs server selection
499+
const hasCredentials = this.s.options.credentials != null;
500+
if (!skipPingOnConnect && hasCredentials) {
501+
await this.db().admin().ping({ readPreference: this[kOptions].readPreference });
517502
}
518503
} catch (error) {
519-
topology.close();
504+
this.topology.close();
520505
throw error;
521506
}
522507
};
@@ -532,69 +517,6 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
532517
return this;
533518
}
534519

535-
/** @internal */
536-
async initTopology() {
537-
if (this.topology && this.topology.isConnected()) {
538-
return this.topology;
539-
}
540-
541-
const topologyOptions = this[kOptions];
542-
543-
if (topologyOptions.tls) {
544-
if (typeof topologyOptions.tlsCAFile === 'string') {
545-
topologyOptions.ca ??= await fs.readFile(topologyOptions.tlsCAFile);
546-
}
547-
if (typeof topologyOptions.tlsCRLFile === 'string') {
548-
topologyOptions.crl ??= await fs.readFile(topologyOptions.tlsCRLFile);
549-
}
550-
if (typeof topologyOptions.tlsCertificateKeyFile === 'string') {
551-
if (!topologyOptions.key || !topologyOptions.cert) {
552-
const contents = await fs.readFile(topologyOptions.tlsCertificateKeyFile);
553-
topologyOptions.key ??= contents;
554-
topologyOptions.cert ??= contents;
555-
}
556-
}
557-
}
558-
if (typeof topologyOptions.srvHost === 'string') {
559-
const hosts = await resolveSRVRecord(topologyOptions);
560-
561-
for (const [index, host] of hosts.entries()) {
562-
topologyOptions.hosts[index] = host;
563-
}
564-
}
565-
566-
// It is important to perform validation of hosts AFTER SRV resolution, to check the real hostname,
567-
// but BEFORE we even attempt connecting with a potentially not allowed hostname
568-
if (topologyOptions.credentials?.mechanism === AuthMechanism.MONGODB_OIDC) {
569-
const allowedHosts =
570-
topologyOptions.credentials?.mechanismProperties?.ALLOWED_HOSTS || DEFAULT_ALLOWED_HOSTS;
571-
const isServiceAuth = !!topologyOptions.credentials?.mechanismProperties?.ENVIRONMENT;
572-
if (!isServiceAuth) {
573-
for (const host of topologyOptions.hosts) {
574-
if (!hostMatchesWildcards(host.toHostPort().host, allowedHosts)) {
575-
throw new MongoInvalidArgumentError(
576-
`Host '${host}' is not valid for OIDC authentication with ALLOWED_HOSTS of '${allowedHosts.join(
577-
','
578-
)}'`
579-
);
580-
}
581-
}
582-
}
583-
}
584-
585-
this.topology = new Topology(this, topologyOptions.hosts, topologyOptions);
586-
// Events can be emitted before initialization is complete so we have to
587-
// save the reference to the topology on the client ASAP if the event handlers need to access it
588-
589-
this.topology.once(Topology.OPEN, () => this.emit('open', this));
590-
591-
for (const event of MONGO_CLIENT_EVENTS) {
592-
this.topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
593-
}
594-
595-
return this.topology;
596-
}
597-
598520
/**
599521
* Close the client and its underlying connections
600522
*
@@ -643,7 +565,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
643565

644566
// clear out references to old topology
645567
const topology = this.topology;
646-
this.topology = undefined;
568+
// this.topology = undefined;
647569

648570
topology.close();
649571

src/operations/execute_operation.ts

Lines changed: 38 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,13 @@ import {
88
MongoExpiredSessionError,
99
MongoInvalidArgumentError,
1010
MongoNetworkError,
11-
MongoNotConnectedError,
12-
MongoRuntimeError,
1311
MongoServerError,
1412
MongoTransactionError,
1513
MongoUnexpectedServerResponseError
1614
} from '../error';
1715
import type { MongoClient } from '../mongo_client';
1816
import { ReadPreference } from '../read_preference';
19-
import { STATE_CONNECTING } from '../sdam/common';
17+
import { type Server } from '../sdam/server';
2018
import type { ServerDescription } from '../sdam/server_description';
2119
import {
2220
sameServerSelector,
@@ -27,7 +25,7 @@ import type { Topology } from '../sdam/topology';
2725
import type { ClientSession } from '../sessions';
2826
import { TimeoutContext } from '../timeout';
2927
import { supportsRetryableWrites } from '../utils';
30-
import { AbstractOperation, Aspect } from './operation';
28+
import { type AbstractOperation, Aspect } from './operation';
3129

3230
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
3331
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -61,24 +59,47 @@ export async function executeOperation<
6159
T extends AbstractOperation<TResult>,
6260
TResult = ResultTypeFromOperation<T>
6361
>(client: MongoClient, operation: T, timeoutContext?: TimeoutContext): Promise<TResult> {
64-
if (!(operation instanceof AbstractOperation)) {
65-
// TODO(NODE-3483): Extend MongoRuntimeError
66-
throw new MongoRuntimeError('This method requires a valid operation instance');
67-
}
62+
const readPreference = operation.readPreference ?? ReadPreference.primary;
63+
const inTransaction = !!operation.session?.inTransaction();
64+
65+
timeoutContext ??= TimeoutContext.create({
66+
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
67+
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
68+
timeoutMS: operation.options.timeoutMS
69+
});
6870

69-
const topology = await autoConnect(client);
71+
let selector: ReadPreference | ServerSelector;
72+
73+
if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
74+
// GetMore and KillCursor operations must always select the same server, but run through
75+
// server selection to potentially force monitor checks if the server is
76+
// in an unknown state.
77+
selector = sameServerSelector(operation.server?.description);
78+
} else if (operation.trySecondaryWrite) {
79+
// If operation should try to write to secondary use the custom server selector
80+
// otherwise provide the read preference.
81+
selector = secondaryWritableServerSelector(client.topology.commonWireVersion, readPreference);
82+
} else {
83+
selector = readPreference;
84+
}
7085

7186
// The driver sessions spec mandates that we implicitly create sessions for operations
7287
// that are not explicitly provided with a session.
7388
let session = operation.session;
7489
let owner: symbol | undefined;
7590

91+
const server = await client.topology.selectServer(selector, {
92+
session,
93+
operationName: operation.commandName,
94+
timeoutContext
95+
});
96+
7697
if (session == null) {
7798
owner = Symbol();
7899
session = client.startSession({ owner, explicit: false });
79100
} else if (session.hasEnded) {
80101
throw new MongoExpiredSessionError('Use of expired sessions is not permitted');
81-
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
102+
} else if (session.snapshotEnabled && !client.topology.capabilities.supportsSnapshotReads) {
82103
throw new MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later');
83104
} else if (session.client !== client) {
84105
throw new MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
@@ -89,9 +110,6 @@ export async function executeOperation<
89110
);
90111
}
91112

92-
const readPreference = operation.readPreference ?? ReadPreference.primary;
93-
const inTransaction = !!session?.inTransaction();
94-
95113
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
96114

97115
if (
@@ -108,18 +126,13 @@ export async function executeOperation<
108126
session.unpin();
109127
}
110128

111-
timeoutContext ??= TimeoutContext.create({
112-
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
113-
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
114-
timeoutMS: operation.options.timeoutMS
115-
});
116-
117129
try {
118130
return await tryOperation(operation, {
119-
topology,
131+
topology: client.topology,
132+
server,
133+
selector,
120134
timeoutContext,
121-
session,
122-
readPreference
135+
session
123136
});
124137
} finally {
125138
if (session?.owner != null && session.owner === owner) {
@@ -128,34 +141,12 @@ export async function executeOperation<
128141
}
129142
}
130143

131-
/**
132-
* Connects a client if it has not yet been connected
133-
* @internal
134-
*/
135-
async function autoConnect(client: MongoClient): Promise<Topology> {
136-
if (client.topology == null) {
137-
if (client.s.hasBeenClosed) {
138-
throw new MongoNotConnectedError('Client must be connected before running operations');
139-
}
140-
await client._connectWithLock({ skipPing: true });
141-
if (client.topology == null) {
142-
throw new MongoRuntimeError(
143-
'client.connect did not create a topology but also did not throw'
144-
);
145-
}
146-
147-
// @ts-expect-error Typescript can't pick up client.conectWithLock sets topology to a non-nullish value
148-
client.topology.stateTransition(STATE_CONNECTING);
149-
return client.topology;
150-
}
151-
return client.topology;
152-
}
153-
154144
/** @internal */
155145
type RetryOptions = {
156146
session: ClientSession | undefined;
157-
readPreference: ReadPreference;
147+
server: Server;
158148
topology: Topology;
149+
selector: ServerSelector | ReadPreference;
159150
timeoutContext: TimeoutContext;
160151
};
161152

@@ -182,29 +173,8 @@ async function tryOperation<
182173
TResult = ResultTypeFromOperation<T>
183174
>(
184175
operation: T,
185-
{ topology, timeoutContext, session, readPreference }: RetryOptions
176+
{ topology, timeoutContext, session, server, selector }: RetryOptions
186177
): Promise<TResult> {
187-
let selector: ReadPreference | ServerSelector;
188-
189-
if (operation.hasAspect(Aspect.MUST_SELECT_SAME_SERVER)) {
190-
// GetMore and KillCursor operations must always select the same server, but run through
191-
// server selection to potentially force monitor checks if the server is
192-
// in an unknown state.
193-
selector = sameServerSelector(operation.server?.description);
194-
} else if (operation.trySecondaryWrite) {
195-
// If operation should try to write to secondary use the custom server selector
196-
// otherwise provide the read preference.
197-
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
198-
} else {
199-
selector = readPreference;
200-
}
201-
202-
let server = await topology.selectServer(selector, {
203-
session,
204-
operationName: operation.commandName,
205-
timeoutContext
206-
});
207-
208178
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
209179
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
210180
const inTransaction = session?.inTransaction() ?? false;

0 commit comments

Comments
 (0)