Skip to content

Commit 290b7e9

Browse files
W-A-Jamesnbbeeken
authored andcommitted
WIP
1 parent 9a19244 commit 290b7e9

File tree

19 files changed

+377
-188
lines changed

19 files changed

+377
-188
lines changed

src/cmap/connection_pool.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,14 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { type Callback, eachAsync, List, makeCounter, TimeoutController } from '../utils';
30+
import {
31+
type Callback,
32+
eachAsync,
33+
List,
34+
makeCounter,
35+
promiseWithResolvers,
36+
TimeoutController
37+
} from '../utils';
3138
import { connect } from './connect';
3239
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3340
import {
@@ -645,6 +652,56 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
645652
);
646653
}
647654

655+
async withConnectionAsync(
656+
conn: Connection | undefined,
657+
fn: AsyncWithConnectionCallback
658+
): Promise<Connection> {
659+
let result;
660+
if (conn) {
661+
// use the provided connection, and do _not_ check it in after execution
662+
try {
663+
result = await fn(undefined, conn);
664+
return result;
665+
} catch (fnErr) {
666+
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
667+
conn = this.reauthenticateAsync(conn);
668+
} else {
669+
throw fnErr;
670+
}
671+
}
672+
} else {
673+
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
674+
this.checkOut((err, conn) => {
675+
fn(err as MongoError, conn).then(
676+
() => {
677+
if (conn) this.checkIn(conn);
678+
},
679+
fnErr => {
680+
if (conn) {
681+
this.withReauthenticationAsync(fnErr, conn, fn).then(resolve, reject);
682+
} else {
683+
reject(fnErr);
684+
}
685+
}
686+
);
687+
});
688+
return promise;
689+
}
690+
}
691+
692+
async withReauthenticationAsync(
693+
fnErr: AnyError,
694+
conn: Connection,
695+
fn: AsyncWithConnectionCallback
696+
): Promise<Connection> {
697+
if (fnErr instanceof MongoError && fnErr.code === MONGODB_ERROR_CODES.Reauthenticate) {
698+
conn = await this.reauthenticateAsync(conn);
699+
return fn(undefined, conn);
700+
} else {
701+
throw fnErr;
702+
}
703+
}
704+
648705
async reauthenticateAsync(connection: Connection): Promise<Connection> {
649706
const authContext = connection.authContext;
650707
if (!authContext) {
@@ -667,6 +724,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
667724
}
668725

669726
await provider.reauth(authContext);
727+
670728
return connection;
671729
}
672730

@@ -945,3 +1003,8 @@ export type WithConnectionCallback = (
9451003
connection: Connection | undefined,
9461004
callback: Callback<Connection>
9471005
) => void;
1006+
1007+
type AsyncWithConnectionCallback = (
1008+
error: MongoError | undefined,
1009+
connection: Connection | undefined
1010+
) => Promise<Connection>;

src/operations/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,6 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
152152
cmd = decorateWithExplain(cmd, this.explain);
153153
}
154154

155-
return server.commandAsync(this.ns, cmd, options);
155+
return server.command(this.ns, cmd, options);
156156
}
157157
}

src/operations/find.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ export class FindOperation extends CommandOperation<Document> {
111111
findCommand = decorateWithExplain(findCommand, this.explain);
112112
}
113113

114-
return server.commandAsync(this.ns, findCommand, {
114+
return server.command(this.ns, findCommand, {
115115
...this.options,
116116
...this.bsonOptions,
117117
documentsReturnedIn: 'firstBatch',

src/operations/get_more.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ export class GetMoreOperation extends AbstractOperation {
9696
...this.options
9797
};
9898

99-
return server.commandAsync(this.ns, getMoreCmd, commandOptions);
99+
return server.command(this.ns, getMoreCmd, commandOptions);
100100
}
101101
}
102102

src/operations/kill_cursors.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export class KillCursorsOperation extends AbstractOperation {
4646
cursors: [this.cursorId]
4747
};
4848
try {
49-
await server.commandAsync(this.ns, killCursorsCommand, { session });
49+
await server.command(this.ns, killCursorsCommand, { session });
5050
} catch {
5151
// The driver should never emit errors from killCursors, this is spec-ed behavior
5252
}

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
2828

2929
override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
3030
this.server = server;
31-
return server.commandAsync(this.ns, this.command, {
31+
return server.command(this.ns, this.command, {
3232
...this.options,
3333
readPreference: this.readPreference,
3434
session
@@ -54,7 +54,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
5454

5555
override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
5656
this.server = server;
57-
return server.commandAsync(this.ns, this.command, {
57+
return server.command(this.ns, this.command, {
5858
...this.options,
5959
readPreference: this.readPreference,
6060
session

src/operations/search_indexes/create.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export class CreateSearchIndexesOperation extends AbstractOperation<string[]> {
3636
indexes: this.descriptions
3737
};
3838

39-
const res = await server.commandAsync(namespace, command, { session });
39+
const res = await server.command(namespace, command, { session });
4040

4141
const indexesCreated: Array<{ name: string }> = res?.indexesCreated ?? [];
4242
return indexesCreated.map(({ name }) => name);

src/operations/search_indexes/drop.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export class DropSearchIndexOperation extends AbstractOperation<void> {
2828
}
2929

3030
try {
31-
await server.commandAsync(namespace, command, { session });
31+
await server.command(namespace, command, { session });
3232
} catch (error) {
3333
const isNamespaceNotFoundError =
3434
error instanceof MongoServerError && error.code === MONGODB_ERROR_CODES.NamespaceNotFound;

src/operations/search_indexes/update.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class UpdateSearchIndexOperation extends AbstractOperation<void> {
2727
definition: this.definition
2828
};
2929

30-
await server.commandAsync(namespace, command, { session });
30+
await server.command(namespace, command, { session });
3131
return;
3232
}
3333
}

0 commit comments

Comments
 (0)