Skip to content

Commit 4aa6575

Browse files
W-A-Jamesnbbeeken
authored andcommitted
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
1 parent 3f4313e commit 4aa6575

File tree

11 files changed

+88
-123
lines changed

11 files changed

+88
-123
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type Timeout } from '../timeout';
32+
import { type TimeoutContext } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9696
directConnection?: boolean;
9797

9898
/** @internal */
99-
timeout?: Timeout;
99+
timeoutContext?: TimeoutContext;
100100
}
101101

102102
/** @public */

src/cmap/connection_pool.ts

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -362,35 +362,9 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
362362
new ConnectionCheckOutStartedEvent(this)
363363
);
364364

365-
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
366-
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
367-
368365
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
369366

370-
let timeout: Timeout | null = null;
371-
if (options?.timeout) {
372-
// CSOT enabled
373-
// Determine if we're using the timeout passed in or a new timeout
374-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
375-
// This check determines whether or not Topology.selectServer used the configured
376-
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
377-
if (
378-
options.timeout.duration === serverSelectionTimeoutMS ||
379-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
380-
) {
381-
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
382-
// here
383-
timeout = options.timeout;
384-
} else {
385-
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
386-
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
387-
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
388-
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
389-
}
390-
}
391-
} else {
392-
timeout = Timeout.expires(waitQueueTimeoutMS);
393-
}
367+
const timeout = options.timeoutContext.connectionCheckoutTimeout;
394368

395369
const waitQueueMember: WaitQueueMember = {
396370
resolve,
@@ -419,7 +393,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
419393
: 'Timed out while checking out a connection from connection pool',
420394
this.address
421395
);
422-
if (options?.timeout) {
396+
if (options.timeoutContext.csotEnabled()) {
423397
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
424398
cause: timeoutError
425399
});
@@ -428,7 +402,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
428402
}
429403
throw error;
430404
} finally {
431-
if (timeout !== options?.timeout) timeout?.clear();
405+
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
432406
}
433407
}
434408

src/operations/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
136136
const options = {
137137
...this.options,
138138
...this.bsonOptions,
139-
timeout: this.timeout,
139+
timeoutContext,
140140
readPreference: this.readPreference,
141141
session
142142
};

src/operations/execute_operation.ts

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
} from '../sdam/server_selection';
2525
import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
27-
import { Timeout } from '../timeout';
27+
import { TimeoutContext } from '../timeout';
2828
import { squashError, supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

@@ -88,6 +88,12 @@ export async function executeOperation<
8888
);
8989
}
9090

91+
timeoutContext ??= TimeoutContext.create({
92+
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
93+
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
94+
timeoutMS: operation.options.timeoutMS
95+
});
96+
9197
const readPreference = operation.readPreference ?? ReadPreference.primary;
9298
const inTransaction = !!session?.inTransaction();
9399

@@ -200,18 +206,31 @@ async function tryOperation<
200206
selector = readPreference;
201207
}
202208

203-
const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
204-
operation.timeout = timeout;
205-
206209
const server = await topology.selectServer(selector, {
207210
session,
208211
operationName: operation.commandName,
209-
timeout
212+
timeoutContext
210213
});
211214

212-
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
213-
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
214-
const inTransaction = session?.inTransaction() ?? false;
215+
if (session == null) {
216+
// No session also means it is not retryable, early exit
217+
return await operation.execute(server, undefined, timeoutContext);
218+
}
219+
220+
if (!operation.hasAspect(Aspect.RETRYABLE)) {
221+
// non-retryable operation, early exit
222+
try {
223+
return await operation.execute(server, session, timeoutContext);
224+
} finally {
225+
if (session?.owner != null && session.owner === owner) {
226+
try {
227+
await session.endSession();
228+
} catch (error) {
229+
squashError(error);
230+
}
231+
}
232+
}
233+
}
215234

216235
const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
217236

@@ -231,42 +250,16 @@ async function tryOperation<
231250
session.incrementTransactionNumber();
232251
}
233252

234-
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
235-
const maxTries = willRetry ? 2 : 1;
236-
let previousOperationError: MongoError | undefined;
237-
let previousServer: ServerDescription | undefined;
238-
239-
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
240-
for (let tries = 0; tries < maxTries; tries++) {
241-
if (previousOperationError) {
242-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243-
throw new MongoServerError({
244-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
246-
originalError: previousOperationError
247-
});
248-
}
249-
250-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
251-
throw previousOperationError;
252-
253-
if (hasReadAspect && !isRetryableReadError(previousOperationError))
254-
throw previousOperationError;
255-
256-
if (
257-
previousOperationError instanceof MongoNetworkError &&
258-
operation.hasAspect(Aspect.CURSOR_CREATING) &&
259-
session != null &&
260-
session.isPinned &&
261-
!session.inTransaction()
262-
) {
263-
session.unpin({ force: true, forceClear: true });
264-
}
265-
266-
server = await topology.selectServer(selector, {
253+
try {
254+
return await operation.execute(server, session, timeoutContext);
255+
} catch (operationError) {
256+
if (willRetry && operationError instanceof MongoError) {
257+
return await retryOperation(operation, operationError, {
267258
session,
268-
operationName: operation.commandName,
269-
previousServer
259+
topology,
260+
selector,
261+
previousServer: server.description,
262+
timeoutContext
270263
});
271264

272265
if (hasWriteAspect && !supportsRetryableWrites(server)) {
@@ -276,18 +269,22 @@ async function tryOperation<
276269
}
277270
}
278271

279-
try {
280-
return await operation.execute(server, session, timeoutContext);
281-
} catch (operationError) {
282-
if (!(operationError instanceof MongoError)) throw operationError;
272+
/** @internal */
273+
type RetryOptions = {
274+
session: ClientSession;
275+
topology: Topology;
276+
selector: ReadPreference | ServerSelector;
277+
previousServer: ServerDescription;
278+
timeoutContext: TimeoutContext;
279+
};
283280

284281
async function retryOperation<
285282
T extends AbstractOperation<TResult>,
286283
TResult = ResultTypeFromOperation<T>
287284
>(
288285
operation: T,
289286
originalError: MongoError,
290-
{ session, topology, selector, previousServer }: RetryOptions
287+
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
291288
): Promise<TResult> {
292289
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
293290
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
@@ -323,9 +320,9 @@ async function retryOperation<
323320
// select a new server, and attempt to retry the operation
324321
const server = await topology.selectServer(selector, {
325322
session,
326-
timeout: operation.timeout,
327323
operationName: operation.commandName,
328-
previousServer
324+
previousServer,
325+
timeoutContext
329326
});
330327

331328
if (isWriteOperation && !supportsRetryableWrites(server)) {
@@ -335,7 +332,7 @@ async function retryOperation<
335332
}
336333

337334
try {
338-
return await operation.execute(server, session);
335+
return await operation.execute(server, session, timeoutContext);
339336
} catch (retryError) {
340337
if (
341338
retryError instanceof MongoError &&

src/operations/find.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
119119
...this.bsonOptions,
120120
documentsReturnedIn: 'firstBatch',
121121
session,
122-
timeout: this.timeout
122+
timeoutContext
123123
},
124124
this.explain ? ExplainedCursorResponse : CursorResponse
125125
);

src/operations/operation.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5-
import { type Timeout } from '../timeout';
5+
import { type Timeout, type TimeoutContext } from '../timeout';
66
import type { MongoDBNamespace } from '../utils';
77

88
export const Aspect = {
@@ -82,8 +82,6 @@ export abstract class AbstractOperation<TResult = any> {
8282
this.options = options;
8383
this.bypassPinningCheck = !!options.bypassPinningCheck;
8484
this.trySecondaryWrite = false;
85-
86-
this.timeoutMS = options.timeoutMS;
8785
}
8886

8987
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
4747
...this.options,
4848
readPreference: this.readPreference,
4949
session,
50-
timeout: this.timeout
50+
timeoutContext
5151
},
5252
this.options.responseType
5353
);
@@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
8282
...this.options,
8383
readPreference: this.readPreference,
8484
session,
85-
timeout: this.timeout
85+
timeoutContext
8686
});
8787
return res;
8888
}

src/sdam/topology.ts

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
3838
import type { Transaction } from '../transactions';
3939
import {
4040
type Callback,
41-
csotMin,
4241
type EventEmitterWithState,
4342
HostAddress,
4443
List,
@@ -461,13 +460,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
461460
}
462461
}
463462

464-
const timeoutMS = this.client.options.timeoutMS;
465-
const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined;
463+
const timeoutMS = this.client.s.options.timeoutMS;
464+
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
466465
const readPreference = options.readPreference ?? ReadPreference.primary;
466+
467+
const timeoutContext = TimeoutContext.create({
468+
timeoutMS,
469+
serverSelectionTimeoutMS,
470+
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471+
});
472+
467473
const selectServerOptions = {
468474
operationName: 'ping',
469-
timeout,
470-
...options
475+
...options,
476+
timeoutContext
471477
};
472478
try {
473479
const server = await this.selectServer(
@@ -477,7 +483,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
477483

478484
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
479485
if (!skipPingOnConnect && server && this.s.credentials) {
480-
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout });
486+
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
481487
stateTransition(this, STATE_CONNECTED);
482488
this.emit(Topology.OPEN, this);
483489
this.emit(Topology.CONNECT, this);
@@ -566,24 +572,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
566572
new ServerSelectionStartedEvent(selector, this.description, options.operationName)
567573
);
568574
}
569-
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0;
570-
let timeout: Timeout | null;
571-
if (options.timeout) {
572-
// CSOT Enabled
573-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
574-
if (
575-
options.timeout.duration === serverSelectionTimeoutMS ||
576-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
577-
) {
578-
timeout = options.timeout;
579-
} else {
580-
timeout = Timeout.expires(serverSelectionTimeoutMS);
581-
}
582-
} else {
583-
timeout = null;
584-
}
585-
} else {
586-
timeout = Timeout.expires(serverSelectionTimeoutMS);
575+
let timeout;
576+
if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout;
577+
else {
578+
timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
587579
}
588580

589581
const isSharded = this.description.type === TopologyType.Sharded;
@@ -607,7 +599,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
607599
)
608600
);
609601
}
610-
if (timeout !== options.timeout) timeout?.clear();
602+
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
611603
return transaction.server;
612604
}
613605

@@ -657,7 +649,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
657649
);
658650
}
659651

660-
if (options.timeout) {
652+
if (options.timeoutContext?.csotEnabled()) {
661653
throw new MongoOperationTimeoutError('Timed out during server selection', {
662654
cause: timeoutError
663655
});
@@ -667,7 +659,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
667659
// Other server selection error
668660
throw error;
669661
} finally {
670-
if (timeout !== options.timeout) timeout?.clear();
662+
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
671663
}
672664
}
673665
/**

0 commit comments

Comments
 (0)