Skip to content

Commit a47e280

Browse files
W-A-Jamesnbbeeken
authored andcommitted
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
1 parent a1206a0 commit a47e280

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+796
-202
lines changed

src/bulk/common.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
1919
import type { Server } from '../sdam/server';
2020
import type { Topology } from '../sdam/topology';
2121
import type { ClientSession } from '../sessions';
22+
import { type TimeoutContext } from '../timeout';
2223
import {
2324
applyRetryableWrites,
2425
getTopology,
@@ -842,6 +843,9 @@ export interface BulkWriteOptions extends CommandOperationOptions {
842843
forceServerObjectId?: boolean;
843844
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
844845
let?: Document;
846+
847+
/** @internal */
848+
timeoutContext?: TimeoutContext;
845849
}
846850

847851
/**

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
3030
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3131
import { ServerType } from '../sdam/common';
3232
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
33-
import { type Timeout } from '../timeout';
33+
import { type TimeoutContext } from '../timeout';
3434
import {
3535
BufferPool,
3636
calculateDurationInMs,
@@ -97,7 +97,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9797
directConnection?: boolean;
9898

9999
/** @internal */
100-
timeout?: Timeout;
100+
timeoutContext?: TimeoutContext;
101101
}
102102

103103
/** @public */

src/cmap/connection_pool.ts

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { Timeout, TimeoutError } from '../timeout';
31-
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
30+
import { type TimeoutContext, TimeoutError } from '../timeout';
31+
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
3232
import { connect } from './connect';
3333
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3434
import {
@@ -355,41 +355,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
355355
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
356356
* explicitly destroyed by the new owner.
357357
*/
358-
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
358+
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
359359
this.emitAndLog(
360360
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
361361
new ConnectionCheckOutStartedEvent(this)
362362
);
363363

364-
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
365-
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
366-
367364
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
368365

369-
let timeout: Timeout | null = null;
370-
if (options?.timeout) {
371-
// CSOT enabled
372-
// Determine if we're using the timeout passed in or a new timeout
373-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
374-
// This check determines whether or not Topology.selectServer used the configured
375-
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
376-
if (
377-
options.timeout.duration === serverSelectionTimeoutMS ||
378-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
379-
) {
380-
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
381-
// here
382-
timeout = options.timeout;
383-
} else {
384-
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
385-
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
386-
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
387-
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
388-
}
389-
}
390-
} else {
391-
timeout = Timeout.expires(waitQueueTimeoutMS);
392-
}
366+
const timeout = options.timeoutContext.connectionCheckoutTimeout;
393367

394368
const waitQueueMember: WaitQueueMember = {
395369
resolve,
@@ -404,6 +378,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
404378
return await (timeout ? Promise.race([promise, timeout]) : promise);
405379
} catch (error) {
406380
if (TimeoutError.is(error)) {
381+
timeout?.clear();
407382
waitQueueMember[kCancelled] = true;
408383

409384
this.emitAndLog(
@@ -416,7 +391,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
416391
: 'Timed out while checking out a connection from connection pool',
417392
this.address
418393
);
419-
if (options?.timeout) {
394+
if (options.timeoutContext.csotEnabled()) {
420395
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
421396
cause: timeoutError
422397
});
@@ -425,7 +400,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
425400
}
426401
throw error;
427402
} finally {
428-
if (timeout !== options?.timeout) timeout?.clear();
403+
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
429404
}
430405
}
431406

src/index.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,13 @@ export type {
566566
RTTSampler,
567567
ServerMonitoringMode
568568
} from './sdam/monitor';
569-
export type { Server, ServerEvents, ServerOptions, ServerPrivate } from './sdam/server';
569+
export type {
570+
Server,
571+
ServerCommandOptions,
572+
ServerEvents,
573+
ServerOptions,
574+
ServerPrivate
575+
} from './sdam/server';
570576
export type {
571577
ServerDescription,
572578
ServerDescriptionOptions,
@@ -597,7 +603,15 @@ export type {
597603
WithTransactionCallback
598604
} from './sessions';
599605
export type { Sort, SortDirection, SortDirectionForCmd, SortForCmd } from './sort';
600-
export type { Timeout } from './timeout';
606+
export type {
607+
CSOTTimeoutContext,
608+
CSOTTimeoutContextOptions,
609+
LegacyTimeoutContext,
610+
LegacyTimeoutContextOptions,
611+
Timeout,
612+
TimeoutContext,
613+
TimeoutContextOptions
614+
} from './timeout';
601615
export type { Transaction, TransactionOptions, TxnState } from './transactions';
602616
export type {
603617
BufferPool,

src/operations/aggregate.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MongoInvalidArgumentError } from '../error';
44
import { type ExplainOptions } from '../explain';
55
import type { Server } from '../sdam/server';
66
import type { ClientSession } from '../sessions';
7+
import { type TimeoutContext } from '../timeout';
78
import { maxWireVersion, type MongoDBNamespace } from '../utils';
89
import { WriteConcern } from '../write_concern';
910
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
@@ -105,7 +106,8 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
105106

106107
override async execute(
107108
server: Server,
108-
session: ClientSession | undefined
109+
session: ClientSession | undefined,
110+
timeoutContext: TimeoutContext
109111
): Promise<CursorResponse> {
110112
const options: AggregateOptions = this.options;
111113
const serverWireVersion = maxWireVersion(server);
@@ -150,6 +152,7 @@ export class AggregateOperation extends CommandOperation<CursorResponse> {
150152
server,
151153
session,
152154
command,
155+
timeoutContext,
153156
this.explain ? ExplainedCursorResponse : CursorResponse
154157
);
155158
}

src/operations/bulk_write.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
import type { Collection } from '../collection';
88
import type { Server } from '../sdam/server';
99
import type { ClientSession } from '../sessions';
10+
import { type TimeoutContext } from '../timeout';
1011
import { AbstractOperation, Aspect, defineAspects } from './operation';
1112

1213
/** @internal */
@@ -32,11 +33,17 @@ export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
3233

3334
override async execute(
3435
server: Server,
35-
session: ClientSession | undefined
36+
session: ClientSession | undefined,
37+
timeoutContext: TimeoutContext
3638
): Promise<BulkWriteResult> {
3739
const coll = this.collection;
3840
const operations = this.operations;
39-
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
41+
const options = {
42+
...this.options,
43+
...this.bsonOptions,
44+
readPreference: this.readPreference,
45+
timeoutContext
46+
};
4047

4148
// Create the bulk operation
4249
const bulk: BulkOperationBase =

src/operations/command.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type { ReadPreference } from '../read_preference';
77
import type { Server } from '../sdam/server';
88
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
99
import type { ClientSession } from '../sessions';
10+
import { type TimeoutContext } from '../timeout';
1011
import {
1112
commandSupportsReadConcern,
1213
decorateWithExplain,
@@ -112,27 +113,30 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
112113
server: Server,
113114
session: ClientSession | undefined,
114115
cmd: Document,
116+
timeoutContext: TimeoutContext,
115117
responseType: T | undefined
116118
): Promise<typeof responseType extends undefined ? Document : InstanceType<T>>;
117119

118120
public async executeCommand(
119121
server: Server,
120122
session: ClientSession | undefined,
121-
cmd: Document
123+
cmd: Document,
124+
timeoutContext: TimeoutContext
122125
): Promise<Document>;
123126

124127
async executeCommand(
125128
server: Server,
126129
session: ClientSession | undefined,
127130
cmd: Document,
131+
timeoutContext: TimeoutContext,
128132
responseType?: MongoDBResponseConstructor
129133
): Promise<Document> {
130134
this.server = server;
131135

132136
const options = {
133137
...this.options,
134138
...this.bsonOptions,
135-
timeout: this.timeout,
139+
timeoutContext,
136140
readPreference: this.readPreference,
137141
session
138142
};

src/operations/count.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { Document } from '../bson';
22
import type { Collection } from '../collection';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5+
import { type TimeoutContext } from '../timeout';
56
import type { MongoDBNamespace } from '../utils';
67
import { CommandOperation, type CommandOperationOptions } from './command';
78
import { Aspect, defineAspects } from './operation';
@@ -36,7 +37,11 @@ export class CountOperation extends CommandOperation<number> {
3637
return 'count' as const;
3738
}
3839

39-
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
40+
override async execute(
41+
server: Server,
42+
session: ClientSession | undefined,
43+
timeoutContext: TimeoutContext
44+
): Promise<number> {
4045
const options = this.options;
4146
const cmd: Document = {
4247
count: this.collectionName,
@@ -59,7 +64,7 @@ export class CountOperation extends CommandOperation<number> {
5964
cmd.maxTimeMS = options.maxTimeMS;
6065
}
6166

62-
const result = await super.executeCommand(server, session, cmd);
67+
const result = await super.executeCommand(server, session, cmd, timeoutContext);
6368
return result ? result.n : 0;
6469
}
6570
}

src/operations/create_collection.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { MongoCompatibilityError } from '../error';
99
import type { PkFactory } from '../mongo_client';
1010
import type { Server } from '../sdam/server';
1111
import type { ClientSession } from '../sessions';
12+
import { type TimeoutContext } from '../timeout';
1213
import { CommandOperation, type CommandOperationOptions } from './command';
1314
import { CreateIndexesOperation } from './indexes';
1415
import { Aspect, defineAspects } from './operation';
@@ -124,7 +125,11 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
124125
return 'create' as const;
125126
}
126127

127-
override async execute(server: Server, session: ClientSession | undefined): Promise<Collection> {
128+
override async execute(
129+
server: Server,
130+
session: ClientSession | undefined,
131+
timeoutContext: TimeoutContext
132+
): Promise<Collection> {
128133
const db = this.db;
129134
const name = this.name;
130135
const options = this.options;
@@ -155,15 +160,15 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
155160
unique: true
156161
}
157162
});
158-
await createOp.executeWithoutEncryptedFieldsCheck(server, session);
163+
await createOp.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
159164
}
160165

161166
if (!options.encryptedFields) {
162167
this.options = { ...this.options, encryptedFields };
163168
}
164169
}
165170

166-
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session);
171+
const coll = await this.executeWithoutEncryptedFieldsCheck(server, session, timeoutContext);
167172

168173
if (encryptedFields) {
169174
// Create the required index for queryable encryption support.
@@ -173,15 +178,16 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
173178
{ __safeContent__: 1 },
174179
{}
175180
);
176-
await createIndexOp.execute(server, session);
181+
await createIndexOp.execute(server, session, timeoutContext);
177182
}
178183

179184
return coll;
180185
}
181186

182187
private async executeWithoutEncryptedFieldsCheck(
183188
server: Server,
184-
session: ClientSession | undefined
189+
session: ClientSession | undefined,
190+
timeoutContext: TimeoutContext
185191
): Promise<Collection> {
186192
const db = this.db;
187193
const name = this.name;
@@ -198,7 +204,7 @@ export class CreateCollectionOperation extends CommandOperation<Collection> {
198204
}
199205
}
200206
// otherwise just execute the command
201-
await super.executeCommand(server, session, cmd);
207+
await super.executeCommand(server, session, cmd, timeoutContext);
202208
return new Collection(db, name, options);
203209
}
204210
}

src/operations/delete.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MongoCompatibilityError, MongoServerError } from '../error';
44
import { type TODO_NODE_3286 } from '../mongo_types';
55
import type { Server } from '../sdam/server';
66
import type { ClientSession } from '../sessions';
7+
import { type TimeoutContext } from '../timeout';
78
import { type MongoDBNamespace } from '../utils';
89
import { type WriteConcernOptions } from '../write_concern';
910
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
@@ -67,7 +68,8 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
6768

6869
override async execute(
6970
server: Server,
70-
session: ClientSession | undefined
71+
session: ClientSession | undefined,
72+
timeoutContext: TimeoutContext
7173
): Promise<DeleteResult> {
7274
const options = this.options ?? {};
7375
const ordered = typeof options.ordered === 'boolean' ? options.ordered : true;
@@ -95,7 +97,12 @@ export class DeleteOperation extends CommandOperation<DeleteResult> {
9597
}
9698
}
9799

98-
const res: TODO_NODE_3286 = await super.executeCommand(server, session, command);
100+
const res: TODO_NODE_3286 = await super.executeCommand(
101+
server,
102+
session,
103+
command,
104+
timeoutContext
105+
);
99106
return res;
100107
}
101108
}
@@ -107,9 +114,10 @@ export class DeleteOneOperation extends DeleteOperation {
107114

108115
override async execute(
109116
server: Server,
110-
session: ClientSession | undefined
117+
session: ClientSession | undefined,
118+
timeoutContext: TimeoutContext
111119
): Promise<DeleteResult> {
112-
const res: TODO_NODE_3286 = await super.execute(server, session);
120+
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
113121
if (this.explain) return res;
114122
if (res.code) throw new MongoServerError(res);
115123
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);
@@ -127,9 +135,10 @@ export class DeleteManyOperation extends DeleteOperation {
127135

128136
override async execute(
129137
server: Server,
130-
session: ClientSession | undefined
138+
session: ClientSession | undefined,
139+
timeoutContext: TimeoutContext
131140
): Promise<DeleteResult> {
132-
const res: TODO_NODE_3286 = await super.execute(server, session);
141+
const res: TODO_NODE_3286 = await super.execute(server, session, timeoutContext);
133142
if (this.explain) return res;
134143
if (res.code) throw new MongoServerError(res);
135144
if (res.writeErrors) throw new MongoServerError(res.writeErrors[0]);

0 commit comments

Comments
 (0)