Skip to content

Commit 3e9c86b

Browse files
W-A-Jamesnbbeeken
authored andcommitted
WIP - connect logic refactor
1 parent 2955c6b commit 3e9c86b

File tree

4 files changed

+125
-164
lines changed

4 files changed

+125
-164
lines changed

src/collection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,7 @@ export class Collection<TSchema extends Document = Document> {
674674
);
675675
return true;
676676
} catch {
677+
// TODO: Make sure we throw on MongoOperationTimeoutError
677678
return false;
678679
}
679680
}

src/mongo_client.ts

Lines changed: 70 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { once } from 'events';
12
import { promises as fs } from 'fs';
23
import type { TcpNetConnectOpts } from 'net';
34
import type { ConnectionOptions as TLSConnectionOptions, TLSSocketOptions } from 'tls';
@@ -20,7 +21,7 @@ import { parseOptions, resolveSRVRecord } from './connection_string';
2021
import { MONGO_CLIENT_EVENTS } from './constants';
2122
import { Db, type DbOptions } from './db';
2223
import type { Encrypter } from './encrypter';
23-
import { MongoInvalidArgumentError } from './error';
24+
import { MongoInvalidArgumentError, MongoNetworkTimeoutError } from './error';
2425
import { MongoClientAuthProviders } from './mongo_client_auth_providers';
2526
import {
2627
type LogComponentSeveritiesClientOptions,
@@ -34,12 +35,15 @@ import { executeOperation } from './operations/execute_operation';
3435
import { RunAdminCommandOperation } from './operations/run_command';
3536
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
3637
import { ReadPreference, type ReadPreferenceMode } from './read_preference';
38+
import { STATE_CONNECTED, STATE_CONNECTING } from './sdam/common';
3739
import type { ServerMonitoringMode } from './sdam/monitor';
40+
import { Server } from './sdam/server';
3841
import type { TagSet } from './sdam/server_description';
3942
import { readPreferenceServerSelector } from './sdam/server_selection';
4043
import type { SrvPoller } from './sdam/srv_polling';
4144
import { Topology, type TopologyEvents } from './sdam/topology';
4245
import { ClientSession, type ClientSessionOptions, ServerSessionPool } from './sessions';
46+
import { Timeout } from './timeout';
4347
import {
4448
COSMOS_DB_CHECK,
4549
COSMOS_DB_MSG,
@@ -469,12 +473,20 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
469473
* @see docs.mongodb.org/manual/reference/connection-string/
470474
*/
471475
async connect(): Promise<this> {
476+
return await this._connectWithLock();
477+
}
478+
479+
/** @internal */
480+
async _connectWithLock(options?: {
481+
skipPing: boolean;
482+
readPreference?: ReadPreference;
483+
}): Promise<this> {
472484
if (this.connectionLock) {
473485
return await this.connectionLock;
474486
}
475487

476488
try {
477-
this.connectionLock = this._connect();
489+
this.connectionLock = this._connect(options);
478490
await this.connectionLock;
479491
} finally {
480492
// release
@@ -490,44 +502,78 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
490502
*
491503
* @internal
492504
*/
493-
private async _connect(): Promise<this> {
505+
private async _connect(options?: {
506+
skipPing: boolean;
507+
readPreference?: ReadPreference;
508+
}): Promise<this> {
509+
const topology = await this.initTopology();
510+
const readPreference = options?.readPreference ?? this.readPreference;
511+
const { encrypter } = this[kOptions];
512+
const topologyConnect = async () => {
513+
try {
514+
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
515+
if (!skipPingOnConnect && !options?.skipPing && topology.s.credentials != null) {
516+
await this.db().admin().ping({ readPreference }); // performs server selection and sends ping
517+
topology.stateTransition(STATE_CONNECTED);
518+
topology.emit(Topology.OPEN, topology);
519+
topology.emit(Topology.CONNECT, topology);
520+
}
521+
} catch (error) {
522+
topology.close();
523+
throw error;
524+
}
525+
};
526+
527+
if (this.autoEncrypter) {
528+
await this.autoEncrypter?.init();
529+
await topologyConnect();
530+
await encrypter.connectInternalClient();
531+
} else {
532+
await topologyConnect();
533+
}
534+
535+
return this;
536+
}
537+
538+
/** @internal */
539+
async initTopology() {
494540
if (this.topology && this.topology.isConnected()) {
495-
return this;
541+
return this.topology;
496542
}
497543

498-
const options = this[kOptions];
544+
const topologyOptions = this[kOptions];
499545

500-
if (options.tls) {
501-
if (typeof options.tlsCAFile === 'string') {
502-
options.ca ??= await fs.readFile(options.tlsCAFile);
546+
if (topologyOptions.tls) {
547+
if (typeof topologyOptions.tlsCAFile === 'string') {
548+
topologyOptions.ca ??= await fs.readFile(topologyOptions.tlsCAFile);
503549
}
504-
if (typeof options.tlsCRLFile === 'string') {
505-
options.crl ??= await fs.readFile(options.tlsCRLFile);
550+
if (typeof topologyOptions.tlsCRLFile === 'string') {
551+
topologyOptions.crl ??= await fs.readFile(topologyOptions.tlsCRLFile);
506552
}
507-
if (typeof options.tlsCertificateKeyFile === 'string') {
508-
if (!options.key || !options.cert) {
509-
const contents = await fs.readFile(options.tlsCertificateKeyFile);
510-
options.key ??= contents;
511-
options.cert ??= contents;
553+
if (typeof topologyOptions.tlsCertificateKeyFile === 'string') {
554+
if (!topologyOptions.key || !topologyOptions.cert) {
555+
const contents = await fs.readFile(topologyOptions.tlsCertificateKeyFile);
556+
topologyOptions.key ??= contents;
557+
topologyOptions.cert ??= contents;
512558
}
513559
}
514560
}
515-
if (typeof options.srvHost === 'string') {
516-
const hosts = await resolveSRVRecord(options);
561+
if (typeof topologyOptions.srvHost === 'string') {
562+
const hosts = await resolveSRVRecord(topologyOptions);
517563

518564
for (const [index, host] of hosts.entries()) {
519-
options.hosts[index] = host;
565+
topologyOptions.hosts[index] = host;
520566
}
521567
}
522568

523569
// It is important to perform validation of hosts AFTER SRV resolution, to check the real hostname,
524570
// but BEFORE we even attempt connecting with a potentially not allowed hostname
525-
if (options.credentials?.mechanism === AuthMechanism.MONGODB_OIDC) {
571+
if (topologyOptions.credentials?.mechanism === AuthMechanism.MONGODB_OIDC) {
526572
const allowedHosts =
527-
options.credentials?.mechanismProperties?.ALLOWED_HOSTS || DEFAULT_ALLOWED_HOSTS;
528-
const isServiceAuth = !!options.credentials?.mechanismProperties?.ENVIRONMENT;
573+
topologyOptions.credentials?.mechanismProperties?.ALLOWED_HOSTS || DEFAULT_ALLOWED_HOSTS;
574+
const isServiceAuth = !!topologyOptions.credentials?.mechanismProperties?.ENVIRONMENT;
529575
if (!isServiceAuth) {
530-
for (const host of options.hosts) {
576+
for (const host of topologyOptions.hosts) {
531577
if (!hostMatchesWildcards(host.toHostPort().host, allowedHosts)) {
532578
throw new MongoInvalidArgumentError(
533579
`Host '${host}' is not valid for OIDC authentication with ALLOWED_HOSTS of '${allowedHosts.join(
@@ -539,7 +585,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
539585
}
540586
}
541587

542-
this.topology = new Topology(this, options.hosts, options);
588+
this.topology = new Topology(this, topologyOptions.hosts, topologyOptions);
543589
// Events can be emitted before initialization is complete so we have to
544590
// save the reference to the topology on the client ASAP if the event handlers need to access it
545591

@@ -549,24 +595,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
549595
this.topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
550596
}
551597

552-
const topologyConnect = async () => {
553-
try {
554-
await this.topology?.connect(options);
555-
} catch (error) {
556-
this.topology?.close();
557-
throw error;
558-
}
559-
};
560-
561-
if (this.autoEncrypter) {
562-
await this.autoEncrypter?.init();
563-
await topologyConnect();
564-
await options.encrypter.connectInternalClient();
565-
} else {
566-
await topologyConnect();
567-
}
568-
569-
return this;
598+
return this.topology;
570599
}
571600

572601
/**

src/operations/execute_operation.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from '../error';
1717
import type { MongoClient } from '../mongo_client';
1818
import { ReadPreference } from '../read_preference';
19+
import { STATE_CONNECTING } from '../sdam/common';
1920
import type { ServerDescription } from '../sdam/server_description';
2021
import {
2122
sameServerSelector,
@@ -136,18 +137,16 @@ async function autoConnect(client: MongoClient): Promise<Topology> {
136137
if (client.s.hasBeenClosed) {
137138
throw new MongoNotConnectedError('Client must be connected before running operations');
138139
}
139-
client.s.options[Symbol.for('@@mdb.skipPingOnConnect')] = true;
140-
try {
141-
await client.connect();
142-
if (client.topology == null) {
143-
throw new MongoRuntimeError(
144-
'client.connect did not create a topology but also did not throw'
145-
);
146-
}
147-
return client.topology;
148-
} finally {
149-
delete client.s.options[Symbol.for('@@mdb.skipPingOnConnect')];
140+
await client._connectWithLock({ skipPing: true });
141+
if (client.topology == null) {
142+
throw new MongoRuntimeError(
143+
'client.connect did not create a topology but also did not throw'
144+
);
150145
}
146+
147+
// @ts-expect-error Typescript can't pick up client.conectWithLock sets topology to a non-nullish value
148+
client.topology.stateTransition(STATE_CONNECTING);
149+
return client.topology;
151150
}
152151
return client.topology;
153152
}

0 commit comments

Comments
 (0)