Skip to content

Commit 2207e49

Browse files
committed
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
1 parent 9e1d550 commit 2207e49

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+796
-202
lines changed

src/bulk/common.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
2020
import type { Server } from '../sdam/server';
2121
import type { Topology } from '../sdam/topology';
2222
import type { ClientSession } from '../sessions';
23+
import { type TimeoutContext } from '../timeout';
2324
import { maybeAddIdToDocuments } from '../utils';
2425
import {
2526
applyRetryableWrites,
@@ -874,6 +875,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
874875
forceServerObjectId?: boolean;
875876
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
876877
let?: Document;
878+
879+
/** @internal */
880+
timeoutContext?: TimeoutContext;
877881
}
878882

879883
const executeCommandsAsync = promisify(executeCommands);

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type Timeout } from '../timeout';
32+
import { type TimeoutContext } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9696
directConnection?: boolean;
9797

9898
/** @internal */
99-
timeout?: Timeout;
99+
timeoutContext?: TimeoutContext;
100100
}
101101

102102
/** @public */

src/cmap/connection_pool.ts

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { Timeout, TimeoutError } from '../timeout';
31-
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
30+
import { type TimeoutContext, TimeoutError } from '../timeout';
31+
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
3232
import { connect } from './connect';
3333
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3434
import {
@@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355
* explicitly destroyed by the new owner.
356356
*/
357-
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
357+
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
358358
this.emitAndLog(
359359
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
360360
new ConnectionCheckOutStartedEvent(this)
361361
);
362362

363-
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
364-
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
365-
366363
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
367364

368-
let timeout: Timeout | null = null;
369-
if (options?.timeout) {
370-
// CSOT enabled
371-
// Determine if we're using the timeout passed in or a new timeout
372-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
373-
// This check determines whether or not Topology.selectServer used the configured
374-
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375-
if (
376-
options.timeout.duration === serverSelectionTimeoutMS ||
377-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
378-
) {
379-
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380-
// here
381-
timeout = options.timeout;
382-
} else {
383-
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384-
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385-
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386-
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
387-
}
388-
}
389-
} else {
390-
timeout = Timeout.expires(waitQueueTimeoutMS);
391-
}
365+
const timeout = options.timeoutContext.connectionCheckoutTimeout;
392366

393367
const waitQueueMember: WaitQueueMember = {
394368
resolve,
@@ -403,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
403377
return await (timeout ? Promise.race([promise, timeout]) : promise);
404378
} catch (error) {
405379
if (TimeoutError.is(error)) {
380+
timeout?.clear();
406381
waitQueueMember[kCancelled] = true;
407382

408383
this.emitAndLog(
@@ -415,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
415390
: 'Timed out while checking out a connection from connection pool',
416391
this.address
417392
);
418-
if (options?.timeout) {
393+
if (options.timeoutContext.csotEnabled()) {
419394
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
420395
cause: timeoutError
421396
});
@@ -424,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
424399
}
425400
throw error;
426401
} finally {
427-
if (timeout !== options?.timeout) timeout?.clear();
402+
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
428403
}
429404
}
430405

src/index.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,13 @@ export type {
534534
RTTSampler,
535535
ServerMonitoringMode
536536
} from './sdam/monitor';
537-
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
537+
export type {
538+
Server,
539+
ServerCommandOptions,
540+
ServerEvents,
541+
ServerOptions,
542+
ServerPrivate
543+
} from './sdam/server';
538544
export type {
539545
ServerDescription,
540546
ServerDescriptionOptions,
@@ -565,7 +571,15 @@ export type {
565571
WithTransactionCallback
566572
} from './sessions';
567573
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
568-
export type { Timeout } from './timeout';
574+
export type {
575+
CSOTTimeoutContext,
576+
CSOTTimeoutContextOptions,
577+
LegacyTimeoutContext,
578+
LegacyTimeoutContextOptions,
579+
Timeout,
580+
TimeoutContext,
581+
TimeoutContextOptions
582+
} from './timeout';
569583
export type { Transaction, TransactionOptions, TxnState } from './transactions';
570584
export type {
571585
BufferPool,

src/operations/aggregate.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/r
33
import { MongoInvalidArgumentError } from '../error';
44
import type { Server } from '../sdam/server';
55
import type { ClientSession } from '../sessions';
6+
import { type TimeoutContext } from '../timeout';
67
import { maxWireVersion, type MongoDBNamespace } from '../utils';
78
import { WriteConcern } from '../write_concern';
89
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
@@ -96,7 +97,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
9697

9798
override async execute(
9899
server: Server,
99-
session: ClientSession | undefined
100+
session: ClientSession | undefined,
101+
timeoutContext: TimeoutContext
100102
): Promise<CursorResponse> {
101103
const options: AggregateOptions = this.options;
102104
const serverWireVersion = maxWireVersion(server);
@@ -141,6 +143,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
141143
server,
142144
session,
143145
command,
146+
timeoutContext,
144147
this.explain ? ExplainedCursorResponse : CursorResponse
145148
);
146149
}

src/operations/bulk_write.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
import type { Collection } from '../collection';
88
import type { Server } from '../sdam/server';
99
import type { ClientSession } from '../sessions';
10+
import { type TimeoutContext } from '../timeout';
1011
import { AbstractOperation, Aspect, defineAspects } from './operation';
1112

1213
/** @internal */
@@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
3233

3334
override async execute(
3435
server: Server,
35-
session: ClientSession | undefined
36+
session: ClientSession | undefined,
37+
timeoutContext: TimeoutContext
3638
): Promise<BulkWriteResult> {
3739
const coll = this.collection;
3840
const operations = this.operations;
39-
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
41+
const options = {
42+
...this.options,
43+
...this.bsonOptions,
44+
readPreference: this.readPreference,
45+
timeoutContext
46+
};
4047

4148
// Create the bulk operation
4249
const bulk: BulkOperationBase =

src/operations/command.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { ReadPreference } from '../read_preference';
77
import type { Server } from '../sdam/server';
88
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
99
import type { ClientSession } from '../sessions';
10+
import { type TimeoutContext } from '../timeout';
1011
import {
1112
commandSupportsReadConcern,
1213
decorateWithExplain,
@@ -112,27 +113,30 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
112113
server: Server,
113114
session: ClientSession | undefined,
114115
cmd: Document,
116+
timeoutContext: TimeoutContext,
115117
responseType: T | undefined
116118
): Promise<typeof responseType extends undefined ? Document : InstanceType<T>>;
117119

118120
public async executeCommand(
119121
server: Server,
120122
session: ClientSession | undefined,
121-
cmd: Document
123+
cmd: Document,
124+
timeoutContext: TimeoutContext
122125
): Promise<Document>;
123126

124127
async executeCommand(
125128
server: Server,
126129
session: ClientSession | undefined,
127130
cmd: Document,
131+
timeoutContext: TimeoutContext,
128132
responseType?: MongoDBResponseConstructor
129133
): Promise<Document> {
130134
this.server = server;
131135

132136
const options = {
133137
...this.options,
134138
...this.bsonOptions,
135-
timeout: this.timeout,
139+
timeoutContext,
136140
readPreference: this.readPreference,
137141
session
138142
};

src/operations/count.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Document } from '../bson';
22
import type { Collection } from '../collection';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type TimeoutContext } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67
import { CommandOperation, type CommandOperationOptions } from './command';
78
import { Aspect, defineAspects } from './operation';
@@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation<number> {
3637
return 'count' as const;
3738
}
3839

39-
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
40+
override async execute(
41+
server: Server,
42+
session: ClientSession | undefined,
43+
timeoutContext: TimeoutContext
44+
): Promise<number> {
4045
const options = this.options;
4146
const cmd: Document = {
4247
count: this.collectionName,
@@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation<number> {
5964
cmd.maxTimeMS = options.maxTimeMS;
6065
}
6166

62-
const result = await super.executeCommand(server, session, cmd);
67+
const result = await super.executeCommand(server, session, cmd, timeoutContext);
6368
return result ? result.n : 0;
6469
}
6570
}

src/operations/create_collection.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error';
99
import type { PkFactory } from '../mongo_client';
1010
import type { Server } from '../sdam/server';
1111
import type { ClientSession } from '../sessions';
12+
import { type TimeoutContext } from '../timeout';
1213
import { CommandOperation, type CommandOperationOptions } from './command';
1314
import { CreateIndexesOperation } from './indexes';
1415
import { Aspect, defineAspects } from './operation';
@@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
124125
return 'create' as const;
125126
}
126127

127-
override async execute(server: Server, session: ClientSession | undefined): Promise<Collection> {
128+
override async execute(
129+
server: Server,
130+
session: ClientSession | undefined,
131+
timeoutContext: TimeoutContext
132+
): Promise<Collection> {
128133
const db = this.db;
129134
const name = this.name;
130135
const options = this.options;
@@ -155,15 +160,15 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
155160
unique: true
156161
}
157162
});
158-
await createOp.executeWithoutEncryptedFieldsCheck(server, session);
163+
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
159164
}
160165

161166
if (!options.encryptedFields) {
162167
this.options = { ...this.options, encryptedFields };
163168
}
164169
}
165170

166-
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session);
171+
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
167172

168173
if (encryptedFields) {
169174
// Create the required index for queryable encryption support.
@@ -173,15 +178,16 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
173178
{ __safeContent__: 1 },
174179
{}
175180
);
176-
await createIndexOp.execute(server, session);
181+
await createIndexOp.execute(server, session, timeoutContext);
177182
}
178183

179184
return coll;
180185
}
181186

182187
private async executeWithoutEncryptedFieldsCheck(
183188
server: Server,
184-
session: ClientSession | undefined
189+
session: ClientSession | undefined,
190+
timeoutContext: TimeoutContext
185191
): Promise<Collection> {
186192
const db = this.db;
187193
const name = this.name;
@@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
198204
}
199205
}
200206
// otherwise just execute the command
201-
await super.executeCommand(server, session, cmd);
207+
await super.executeCommand(server, session, cmd, timeoutContext);
202208
return new Collection(db, name, options);
203209
}
204210
}

src/operations/delete.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoServerError } from '../error';
44
import { type TODO_NODE_3286 } from '../mongo_types';
55
import type { Server } from '../sdam/server';
66
import type { ClientSession } from '../sessions';
7+
import { type TimeoutContext } from '../timeout';
78
import { type MongoDBNamespace } from '../utils';
89
import { type WriteConcernOptions } from '../write_concern';
910
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
@@ -67,7 +68,8 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
6768

6869
override async execute(
6970
server: Server,
70-
session: ClientSession | undefined
71+
session: ClientSession | undefined,
72+
timeoutContext: TimeoutContext
7173
): Promise<DeleteResult> {
7274
const options = this.options ?? {};
7375
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
@@ -95,7 +97,12 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
9597
}
9698
}
9799

98-
const res: TODO_NODE_3286 = await super.executeCommand(server, session, command);
100+
const res: TODO_NODE_3286 = await super.executeCommand(
101+
server,
102+
session,
103+
command,
104+
timeoutContext
105+
);
99106
return res;
100107
}
101108
}
@@ -107,9 +114,10 @@ export class DeleteOneOperation extends DeleteOperation {
107114

108115
override async execute(
109116
server: Server,
110-
session: ClientSession | undefined
117+
session: ClientSession | undefined,
118+
timeoutContext: TimeoutContext
111119
): Promise<DeleteResult> {
112-
const res: TODO_NODE_3286 = await super.execute(server, session);
120+
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
113121
if (this.explain) return res;
114122
if (res.code) throw new MongoServerError(res);
115123
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);
@@ -127,9 +135,10 @@ export class DeleteManyOperation extends DeleteOperation {
127135

128136
override async execute(
129137
server: Server,
130-
session: ClientSession | undefined
138+
session: ClientSession | undefined,
139+
timeoutContext: TimeoutContext
131140
): Promise<DeleteResult> {
132-
const res: TODO_NODE_3286 = await super.execute(server, session);
141+
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
133142
if (this.explain) return res;
134143
if (res.code) throw new MongoServerError(res);
135144
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);

0 commit comments

Comments
 (0)