Skip to content

Commit a0604d0

Browse files
authored
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
1 parent 72dc791 commit a0604d0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+829
-216
lines changed

src/bulk/common.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
2020
import type { Server } from '../sdam/server';
2121
import type { Topology } from '../sdam/topology';
2222
import type { ClientSession } from '../sessions';
23+
import { type TimeoutContext } from '../timeout';
2324
import { maybeAddIdToDocuments } from '../utils';
2425
import {
2526
applyRetryableWrites,
@@ -874,6 +875,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
874875
forceServerObjectId?: boolean;
875876
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
876877
let?: Document;
878+
879+
/** @internal */
880+
timeoutContext?: TimeoutContext;
877881
}
878882

879883
const executeCommandsAsync = promisify(executeCommands);

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type Timeout } from '../timeout';
32+
import { type TimeoutContext } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -95,7 +95,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9595
directConnection?: boolean;
9696

9797
/** @internal */
98-
timeout?: Timeout;
98+
timeoutContext?: TimeoutContext;
9999
}
100100

101101
/** @public */

src/cmap/connection_pool.ts

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { Timeout, TimeoutError } from '../timeout';
31-
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
30+
import { type TimeoutContext, TimeoutError } from '../timeout';
31+
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
3232
import { connect } from './connect';
3333
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3434
import {
@@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355
* explicitly destroyed by the new owner.
356356
*/
357-
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
357+
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
358358
this.emitAndLog(
359359
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
360360
new ConnectionCheckOutStartedEvent(this)
361361
);
362362

363-
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
364-
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
365-
366363
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
367364

368-
let timeout: Timeout | null = null;
369-
if (options?.timeout) {
370-
// CSOT enabled
371-
// Determine if we're using the timeout passed in or a new timeout
372-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
373-
// This check determines whether or not Topology.selectServer used the configured
374-
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375-
if (
376-
options.timeout.duration === serverSelectionTimeoutMS ||
377-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
378-
) {
379-
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380-
// here
381-
timeout = options.timeout;
382-
} else {
383-
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384-
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385-
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386-
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
387-
}
388-
}
389-
} else {
390-
timeout = Timeout.expires(waitQueueTimeoutMS);
391-
}
365+
const timeout = options.timeoutContext.connectionCheckoutTimeout;
392366

393367
const waitQueueMember: WaitQueueMember = {
394368
resolve,
@@ -403,6 +377,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
403377
return await (timeout ? Promise.race([promise, timeout]) : promise);
404378
} catch (error) {
405379
if (TimeoutError.is(error)) {
380+
timeout?.clear();
406381
waitQueueMember[kCancelled] = true;
407382

408383
this.emitAndLog(
@@ -415,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
415390
: 'Timed out while checking out a connection from connection pool',
416391
this.address
417392
);
418-
if (options?.timeout) {
393+
if (options.timeoutContext.csotEnabled()) {
419394
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
420395
cause: timeoutError
421396
});
@@ -424,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
424399
}
425400
throw error;
426401
} finally {
427-
if (timeout !== options?.timeout) timeout?.clear();
402+
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
428403
}
429404
}
430405

src/index.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,13 @@ export type {
526526
RTTSampler,
527527
ServerMonitoringMode
528528
} from './sdam/monitor';
529-
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
529+
export type {
530+
Server,
531+
ServerCommandOptions,
532+
ServerEvents,
533+
ServerOptions,
534+
ServerPrivate
535+
} from './sdam/server';
530536
export type {
531537
ServerDescription,
532538
ServerDescriptionOptions,
@@ -557,7 +563,15 @@ export type {
557563
WithTransactionCallback
558564
} from './sessions';
559565
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
560-
export type { Timeout } from './timeout';
566+
export type {
567+
CSOTTimeoutContext,
568+
CSOTTimeoutContextOptions,
569+
LegacyTimeoutContext,
570+
LegacyTimeoutContextOptions,
571+
Timeout,
572+
TimeoutContext,
573+
TimeoutContextOptions
574+
} from './timeout';
561575
export type { Transaction, TransactionOptions, TxnState } from './transactions';
562576
export type {
563577
BufferPool,

src/operations/aggregate.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { MongoInvalidArgumentError } from '../error';
33
import { type TODO_NODE_3286 } from '../mongo_types';
44
import type { Server } from '../sdam/server';
55
import type { ClientSession } from '../sessions';
6+
import { type TimeoutContext } from '../timeout';
67
import { maxWireVersion, type MongoDBNamespace } from '../utils';
78
import { WriteConcern } from '../write_concern';
89
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
@@ -94,7 +95,11 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
9495
this.pipeline.push(stage);
9596
}
9697

97-
override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
98+
override async execute(
99+
server: Server,
100+
session: ClientSession | undefined,
101+
timeoutContext: TimeoutContext
102+
): Promise<T> {
98103
const options: AggregateOptions = this.options;
99104
const serverWireVersion = maxWireVersion(server);
100105
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
@@ -134,7 +139,12 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
134139
command.cursor.batchSize = options.batchSize;
135140
}
136141

137-
const res: TODO_NODE_3286 = await super.executeCommand(server, session, command);
142+
const res: TODO_NODE_3286 = await super.executeCommand(
143+
server,
144+
session,
145+
command,
146+
timeoutContext
147+
);
138148
return res;
139149
}
140150
}

src/operations/bulk_write.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
import type { Collection } from '../collection';
88
import type { Server } from '../sdam/server';
99
import type { ClientSession } from '../sessions';
10+
import { type TimeoutContext } from '../timeout';
1011
import { AbstractOperation, Aspect, defineAspects } from './operation';
1112

1213
/** @internal */
@@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
3233

3334
override async execute(
3435
server: Server,
35-
session: ClientSession | undefined
36+
session: ClientSession | undefined,
37+
timeoutContext: TimeoutContext
3638
): Promise<BulkWriteResult> {
3739
const coll = this.collection;
3840
const operations = this.operations;
39-
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
41+
const options = {
42+
...this.options,
43+
...this.bsonOptions,
44+
readPreference: this.readPreference,
45+
timeoutContext
46+
};
4047

4148
// Create the bulk operation
4249
const bulk: BulkOperationBase =

src/operations/command.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { ReadPreference } from '../read_preference';
66
import type { Server } from '../sdam/server';
77
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
88
import type { ClientSession } from '../sessions';
9+
import { type TimeoutContext } from '../timeout';
910
import {
1011
commandSupportsReadConcern,
1112
decorateWithExplain,
@@ -110,15 +111,16 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
110111
async executeCommand(
111112
server: Server,
112113
session: ClientSession | undefined,
113-
cmd: Document
114+
cmd: Document,
115+
timeoutContext: TimeoutContext
114116
): Promise<Document> {
115117
// TODO: consider making this a non-enumerable property
116118
this.server = server;
117119

118120
const options = {
119121
...this.options,
120122
...this.bsonOptions,
121-
timeout: this.timeout,
123+
timeoutContext,
122124
readPreference: this.readPreference,
123125
session
124126
};

src/operations/count.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Document } from '../bson';
22
import type { Collection } from '../collection';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type TimeoutContext } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67
import { CommandOperation, type CommandOperationOptions } from './command';
78
import { Aspect, defineAspects } from './operation';
@@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation<number> {
3637
return 'count' as const;
3738
}
3839

39-
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
40+
override async execute(
41+
server: Server,
42+
session: ClientSession | undefined,
43+
timeoutContext: TimeoutContext
44+
): Promise<number> {
4045
const options = this.options;
4146
const cmd: Document = {
4247
count: this.collectionName,
@@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation<number> {
5964
cmd.maxTimeMS = options.maxTimeMS;
6065
}
6166

62-
const result = await super.executeCommand(server, session, cmd);
67+
const result = await super.executeCommand(server, session, cmd, timeoutContext);
6368
return result ? result.n : 0;
6469
}
6570
}

src/operations/count_documents.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Document } from '../bson';
22
import type { Collection } from '../collection';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type TimeoutContext } from '../timeout';
56
import { AggregateOperation, type AggregateOptions } from './aggregate';
67

78
/** @public */
@@ -31,8 +32,12 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
3132
super(collection.s.namespace, pipeline, options);
3233
}
3334

34-
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
35-
const result = await super.execute(server, session);
35+
override async execute(
36+
server: Server,
37+
session: ClientSession | undefined,
38+
timeoutContext: TimeoutContext
39+
): Promise<number> {
40+
const result = await super.execute(server, session, timeoutContext);
3641

3742
// NOTE: We're avoiding creating a cursor here to reduce the callstack.
3843
const response = result as unknown as Document;

src/operations/create_collection.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error';
99
import type { PkFactory } from '../mongo_client';
1010
import type { Server } from '../sdam/server';
1111
import type { ClientSession } from '../sessions';
12+
import { type TimeoutContext } from '../timeout';
1213
import { CommandOperation, type CommandOperationOptions } from './command';
1314
import { CreateIndexesOperation } from './indexes';
1415
import { Aspect, defineAspects } from './operation';
@@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
124125
return 'create' as const;
125126
}
126127

127-
override async execute(server: Server, session: ClientSession | undefined): Promise<Collection> {
128+
override async execute(
129+
server: Server,
130+
session: ClientSession | undefined,
131+
timeoutContext: TimeoutContext
132+
): Promise<Collection> {
128133
const db = this.db;
129134
const name = this.name;
130135
const options = this.options;
@@ -155,15 +160,15 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
155160
unique: true
156161
}
157162
});
158-
await createOp.executeWithoutEncryptedFieldsCheck(server, session);
163+
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
159164
}
160165

161166
if (!options.encryptedFields) {
162167
this.options = { ...this.options, encryptedFields };
163168
}
164169
}
165170

166-
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session);
171+
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
167172

168173
if (encryptedFields) {
169174
// Create the required index for queryable encryption support.
@@ -173,15 +178,16 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
173178
{ __safeContent__: 1 },
174179
{}
175180
);
176-
await createIndexOp.execute(server, session);
181+
await createIndexOp.execute(server, session, timeoutContext);
177182
}
178183

179184
return coll;
180185
}
181186

182187
private async executeWithoutEncryptedFieldsCheck(
183188
server: Server,
184-
session: ClientSession | undefined
189+
session: ClientSession | undefined,
190+
timeoutContext: TimeoutContext
185191
): Promise<Collection> {
186192
const db = this.db;
187193
const name = this.name;
@@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
198204
}
199205
}
200206
// otherwise just execute the command
201-
await super.executeCommand(server, session, cmd);
207+
await super.executeCommand(server, session, cmd, timeoutContext);
202208
return new Collection(db, name, options);
203209
}
204210
}

0 commit comments

Comments
 (0)