Skip to content

Commit dc2eab7

Browse files
W-A-Jamesnbbeeken
authored andcommitted
WIP
1 parent 6df82a5 commit dc2eab7

File tree

3 files changed

+100
-285
lines changed

3 files changed

+100
-285
lines changed

src/cmap/connection_pool.ts

Lines changed: 14 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ import {
1717
} from '../constants';
1818
import {
1919
type AnyError,
20-
MONGODB_ERROR_CODES,
21-
MongoError,
20+
type MongoError,
2221
MongoInvalidArgumentError,
2322
MongoMissingCredentialsError,
2423
MongoNetworkError,
@@ -107,7 +106,8 @@ export interface ConnectionPoolOptions extends Omit<ConnectionOptions, 'id' | 'g
107106

108107
/** @internal */
109108
export interface WaitQueueMember {
110-
callback: Callback<Connection>;
109+
resolve: (conn: Connection) => void;
110+
reject: (err: AnyError) => void;
111111
timeoutController: TimeoutController;
112112
[kCancelled]?: boolean;
113113
}
@@ -357,16 +357,18 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
357357
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
358358
* explicitly destroyed by the new owner.
359359
*/
360-
checkOut(callback: Callback<Connection>): void {
360+
async checkOut(): Promise<Connection> {
361361
this.emitAndLog(
362362
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
363363
new ConnectionCheckOutStartedEvent(this)
364364
);
365365

366366
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
367367

368+
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
368369
const waitQueueMember: WaitQueueMember = {
369-
callback,
370+
resolve,
371+
reject,
370372
timeoutController: new TimeoutController(waitQueueTimeoutMS)
371373
};
372374
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
@@ -377,7 +379,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
377379
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
378380
new ConnectionCheckOutFailedEvent(this, 'timeout')
379381
);
380-
waitQueueMember.callback(
382+
waitQueueMember.reject(
381383
new WaitQueueTimeoutError(
382384
this.loadBalanced
383385
? this.waitQueueErrorMetrics()
@@ -389,6 +391,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
389391

390392
this[kWaitQueue].push(waitQueueMember);
391393
process.nextTick(() => this.processWaitQueue());
394+
395+
return promise;
392396
}
393397

394398
/**
@@ -540,168 +544,6 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
540544
);
541545
}
542546

543-
/**
544-
* Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
545-
* has completed by calling back.
546-
*
547-
* NOTE: please note the required signature of `fn`
548-
*
549-
* @remarks When in load balancer mode, connections can be pinned to cursors or transactions.
550-
* In these cases we pass the connection in to this method to ensure it is used and a new
551-
* connection is not checked out.
552-
*
553-
* @param conn - A pinned connection for use in load balancing mode.
554-
* @param fn - A function which operates on a managed connection
555-
* @param callback - The original callback
556-
*/
557-
withConnection(
558-
conn: Connection | undefined,
559-
fn: WithConnectionCallback,
560-
callback: Callback<Connection>
561-
): void {
562-
if (conn) {
563-
// use the provided connection, and do _not_ check it in after execution
564-
fn(undefined, conn, (fnErr, result) => {
565-
if (fnErr) {
566-
return this.withReauthentication(fnErr, conn, fn, callback);
567-
}
568-
callback(undefined, result);
569-
});
570-
return;
571-
}
572-
573-
this.checkOut((err, conn) => {
574-
// don't callback with `err` here, we might want to act upon it inside `fn`
575-
fn(err as MongoError, conn, (fnErr, result) => {
576-
if (fnErr) {
577-
if (conn) {
578-
this.withReauthentication(fnErr, conn, fn, callback);
579-
} else {
580-
callback(fnErr);
581-
}
582-
} else {
583-
callback(undefined, result);
584-
}
585-
586-
if (conn) {
587-
this.checkIn(conn);
588-
}
589-
});
590-
});
591-
}
592-
593-
private withReauthentication(
594-
fnErr: AnyError,
595-
conn: Connection,
596-
fn: WithConnectionCallback,
597-
callback: Callback<Connection>
598-
) {
599-
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
600-
this.reauthenticate(conn, fn, (error, res) => {
601-
if (error) {
602-
return callback(error);
603-
}
604-
callback(undefined, res);
605-
});
606-
} else {
607-
callback(fnErr);
608-
}
609-
}
610-
611-
/**
612-
* Reauthenticate on the same connection and then retry the operation.
613-
*/
614-
private reauthenticate(
615-
connection: Connection,
616-
fn: WithConnectionCallback,
617-
callback: Callback
618-
): void {
619-
const authContext = connection.authContext;
620-
if (!authContext) {
621-
return callback(new MongoRuntimeError('No auth context found on connection.'));
622-
}
623-
const credentials = authContext.credentials;
624-
if (!credentials) {
625-
return callback(
626-
new MongoMissingCredentialsError(
627-
'Connection is missing credentials when asked to reauthenticate'
628-
)
629-
);
630-
}
631-
const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello);
632-
const provider = this[kServer].topology.client.s.authProviders.getOrCreateProvider(
633-
resolvedCredentials.mechanism
634-
);
635-
if (!provider) {
636-
return callback(
637-
new MongoMissingCredentialsError(
638-
`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`
639-
)
640-
);
641-
}
642-
provider.reauth(authContext).then(
643-
() => {
644-
fn(undefined, connection, (fnErr, fnResult) => {
645-
if (fnErr) {
646-
return callback(fnErr);
647-
}
648-
callback(undefined, fnResult);
649-
});
650-
},
651-
error => callback(error)
652-
);
653-
}
654-
655-
async withConnectionAsync(
656-
conn: Connection | undefined,
657-
fn: AsyncWithConnectionCallback
658-
): Promise<Connection> {
659-
let result;
660-
if (conn) {
661-
// use the provided connection, and do _not_ check it in after execution
662-
try {
663-
result = await fn(undefined, conn);
664-
return result;
665-
} catch (fnErr) {
666-
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
667-
conn = this.reauthenticateAsync(conn);
668-
} else {
669-
throw fnErr;
670-
}
671-
}
672-
} else {
673-
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
674-
this.checkOut((err, conn) => {
675-
fn(err as MongoError, conn).then(
676-
() => {
677-
if (conn) this.checkIn(conn);
678-
},
679-
fnErr => {
680-
if (conn) {
681-
this.withReauthenticationAsync(fnErr, conn, fn).then(resolve, reject);
682-
} else {
683-
reject(fnErr);
684-
}
685-
}
686-
);
687-
});
688-
return promise;
689-
}
690-
}
691-
692-
async withReauthenticationAsync(
693-
fnErr: AnyError,
694-
conn: Connection,
695-
fn: AsyncWithConnectionCallback
696-
): Promise<Connection> {
697-
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
698-
conn = await this.reauthenticateAsync(conn);
699-
return fn(undefined, conn);
700-
} else {
701-
throw fnErr;
702-
}
703-
}
704-
705547
async reauthenticateAsync(connection: Connection): Promise<Connection> {
706548
const authContext = connection.authContext;
707549
if (!authContext) {
@@ -924,7 +766,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
924766
);
925767
waitQueueMember.timeoutController.clear();
926768
this[kWaitQueue].shift();
927-
waitQueueMember.callback(error);
769+
waitQueueMember.reject(error);
928770
continue;
929771
}
930772

@@ -946,7 +788,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
946788
waitQueueMember.timeoutController.clear();
947789

948790
this[kWaitQueue].shift();
949-
waitQueueMember.callback(undefined, connection);
791+
waitQueueMember.resolve(connection);
950792
}
951793
}
952794

@@ -972,16 +814,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
972814
// TODO(NODE-5192): Remove this cast
973815
new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError)
974816
);
817+
waitQueueMember.reject(err);
975818
} else if (connection) {
976819
this[kCheckedOut].add(connection);
977820
this.emitAndLog(
978821
ConnectionPool.CONNECTION_CHECKED_OUT,
979822
new ConnectionCheckedOutEvent(this, connection)
980823
);
824+
waitQueueMember.resolve(connection);
981825
}
982826

983827
waitQueueMember.timeoutController.clear();
984-
waitQueueMember.callback(err, connection);
985828
}
986829
process.nextTick(() => this.processWaitQueue());
987830
});
@@ -1003,8 +846,3 @@ export type WithConnectionCallback = (
1003846
connection: Connection | undefined,
1004847
callback: Callback<Connection>
1005848
) => void;
1006-
1007-
type AsyncWithConnectionCallback = (
1008-
error: MongoError | undefined,
1009-
connection: Connection | undefined
1010-
) => Promise<Connection>;

0 commit comments

Comments
 (0)