Skip to content

Commit 6f7dc0c

Browse files
W-A-Jamesnbbeeken
authored andcommitted
refactor(NODE-6187): refactor to use TimeoutContext abstraction (#4131)
1 parent 4d2a0c8 commit 6f7dc0c

File tree

11 files changed

+91
-126
lines changed

11 files changed

+91
-126
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type Timeout } from '../timeout';
32+
import { type TimeoutContext } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9696
directConnection?: boolean;
9797

9898
/** @internal */
99-
timeout?: Timeout;
99+
timeoutContext?: TimeoutContext;
100100
}
101101

102102
/** @public */

src/cmap/connection_pool.ts

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { Timeout, TimeoutError } from '../timeout';
31-
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
30+
import { type TimeoutContext, TimeoutError } from '../timeout';
31+
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
3232
import { connect } from './connect';
3333
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3434
import {
@@ -354,41 +354,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355
* explicitly destroyed by the new owner.
356356
*/
357-
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
357+
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
358358
this.emitAndLog(
359359
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
360360
new ConnectionCheckOutStartedEvent(this)
361361
);
362362

363-
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
364-
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
365-
366363
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
367364

368-
let timeout: Timeout | null = null;
369-
if (options?.timeout) {
370-
// CSOT enabled
371-
// Determine if we're using the timeout passed in or a new timeout
372-
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
373-
// This check determines whether or not Topology.selectServer used the configured
374-
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375-
if (
376-
options.timeout.duration === serverSelectionTimeoutMS ||
377-
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
378-
) {
379-
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380-
// here
381-
timeout = options.timeout;
382-
} else {
383-
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384-
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385-
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386-
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
387-
}
388-
}
389-
} else {
390-
timeout = Timeout.expires(waitQueueTimeoutMS);
391-
}
365+
const timeout = options.timeoutContext.connectionCheckoutTimeout;
392366

393367
const waitQueueMember: WaitQueueMember = {
394368
resolve,
@@ -416,7 +390,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
416390
: 'Timed out while checking out a connection from connection pool',
417391
this.address
418392
);
419-
if (options?.timeout) {
393+
if (options.timeoutContext.csotEnabled()) {
420394
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
421395
cause: timeoutError
422396
});
@@ -425,7 +399,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
425399
}
426400
throw error;
427401
} finally {
428-
if (timeout !== options?.timeout) timeout?.clear();
402+
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
429403
}
430404
}
431405

src/operations/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
136136
const options = {
137137
...this.options,
138138
...this.bsonOptions,
139-
timeout: this.timeout,
139+
timeoutContext,
140140
readPreference: this.readPreference,
141141
session
142142
};

src/operations/execute_operation.ts

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
} from '../sdam/server_selection';
2525
import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
27-
import { Timeout } from '../timeout';
27+
import { TimeoutContext } from '../timeout';
2828
import { squashError, supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

@@ -88,6 +88,12 @@ export async function executeOperation<
8888
);
8989
}
9090

91+
timeoutContext ??= TimeoutContext.create({
92+
serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
93+
waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
94+
timeoutMS: operation.options.timeoutMS
95+
});
96+
9197
const readPreference = operation.readPreference ?? ReadPreference.primary;
9298
const inTransaction = !!session?.inTransaction();
9399

@@ -200,18 +206,31 @@ async function tryOperation<
200206
selector = readPreference;
201207
}
202208

203-
const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
204-
operation.timeout = timeout;
205-
206209
const server = await topology.selectServer(selector, {
207210
session,
208211
operationName: operation.commandName,
209-
timeout
212+
timeoutContext
210213
});
211214

212-
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
213-
const hasWriteAspect = operation.hasAspect(Aspect.WRITE_OPERATION);
214-
const inTransaction = session?.inTransaction() ?? false;
215+
if (session == null) {
216+
// No session also means it is not retryable, early exit
217+
return await operation.execute(server, undefined, timeoutContext);
218+
}
219+
220+
if (!operation.hasAspect(Aspect.RETRYABLE)) {
221+
// non-retryable operation, early exit
222+
try {
223+
return await operation.execute(server, session, timeoutContext);
224+
} finally {
225+
if (session?.owner != null && session.owner === owner) {
226+
try {
227+
await session.endSession();
228+
} catch (error) {
229+
squashError(error);
230+
}
231+
}
232+
}
233+
}
215234

216235
const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
217236

@@ -231,42 +250,16 @@ async function tryOperation<
231250
session.incrementTransactionNumber();
232251
}
233252

234-
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
235-
const maxTries = willRetry ? 2 : 1;
236-
let previousOperationError: MongoError | undefined;
237-
let previousServer: ServerDescription | undefined;
238-
239-
// TODO(NODE-6231): implement infinite retry within CSOT timeout here
240-
for (let tries = 0; tries < maxTries; tries++) {
241-
if (previousOperationError) {
242-
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
243-
throw new MongoServerError({
244-
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
245-
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
246-
originalError: previousOperationError
247-
});
248-
}
249-
250-
if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
251-
throw previousOperationError;
252-
253-
if (hasReadAspect && !isRetryableReadError(previousOperationError))
254-
throw previousOperationError;
255-
256-
if (
257-
previousOperationError instanceof MongoNetworkError &&
258-
operation.hasAspect(Aspect.CURSOR_CREATING) &&
259-
session != null &&
260-
session.isPinned &&
261-
!session.inTransaction()
262-
) {
263-
session.unpin({ force: true, forceClear: true });
264-
}
265-
266-
server = await topology.selectServer(selector, {
253+
try {
254+
return await operation.execute(server, session, timeoutContext);
255+
} catch (operationError) {
256+
if (willRetry && operationError instanceof MongoError) {
257+
return await retryOperation(operation, operationError, {
267258
session,
268-
operationName: operation.commandName,
269-
previousServer
259+
topology,
260+
selector,
261+
previousServer: server.description,
262+
timeoutContext
270263
});
271264

272265
if (hasWriteAspect && !supportsRetryableWrites(server)) {
@@ -276,18 +269,22 @@ async function tryOperation<
276269
}
277270
}
278271

279-
try {
280-
return await operation.execute(server, session, timeoutContext);
281-
} catch (operationError) {
282-
if (!(operationError instanceof MongoError)) throw operationError;
272+
/** @internal */
273+
type RetryOptions = {
274+
session: ClientSession;
275+
topology: Topology;
276+
selector: ReadPreference | ServerSelector;
277+
previousServer: ServerDescription;
278+
timeoutContext: TimeoutContext;
279+
};
283280

284281
async function retryOperation<
285282
T extends AbstractOperation<TResult>,
286283
TResult = ResultTypeFromOperation<T>
287284
>(
288285
operation: T,
289286
originalError: MongoError,
290-
{ session, topology, selector, previousServer }: RetryOptions
287+
{ session, topology, selector, previousServer, timeoutContext }: RetryOptions
291288
): Promise<TResult> {
292289
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
293290
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
@@ -323,9 +320,9 @@ async function retryOperation<
323320
// select a new server, and attempt to retry the operation
324321
const server = await topology.selectServer(selector, {
325322
session,
326-
timeout: operation.timeout,
327323
operationName: operation.commandName,
328-
previousServer
324+
previousServer,
325+
timeoutContext
329326
});
330327

331328
if (isWriteOperation && !supportsRetryableWrites(server)) {
@@ -335,7 +332,7 @@ async function retryOperation<
335332
}
336333

337334
try {
338-
return await operation.execute(server, session);
335+
return await operation.execute(server, session, timeoutContext);
339336
} catch (retryError) {
340337
if (
341338
retryError instanceof MongoError &&

src/operations/find.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
119119
...this.bsonOptions,
120120
documentsReturnedIn: 'firstBatch',
121121
session,
122-
timeout: this.timeout
122+
timeoutContext
123123
},
124124
this.explain ? ExplainedCursorResponse : CursorResponse
125125
);

src/operations/operation.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5-
import { type Timeout } from '../timeout';
5+
import { type Timeout, type TimeoutContext } from '../timeout';
66
import type { MongoDBNamespace } from '../utils';
77

88
export const Aspect = {
@@ -82,8 +82,6 @@ export abstract class AbstractOperation<TResult = any> {
8282
this.options = options;
8383
this.bypassPinningCheck = !!options.bypassPinningCheck;
8484
this.trySecondaryWrite = false;
85-
86-
this.timeoutMS = options.timeoutMS;
8785
}
8886

8987
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
4747
...this.options,
4848
readPreference: this.readPreference,
4949
session,
50-
timeout: this.timeout
50+
timeoutContext
5151
},
5252
this.options.responseType
5353
);
@@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
8282
...this.options,
8383
readPreference: this.readPreference,
8484
session,
85-
timeout: this.timeout
85+
timeoutContext
8686
});
8787
return res;
8888
}

0 commit comments

Comments
 (0)