Skip to content

Commit 3f4313e

Browse files
W-A-Jamesnbbeeken
authored andcommitted
feat(NODE-6090): Implement CSOT logic for connection checkout and server selection
1 parent 7b4a1fb commit 3f4313e

File tree

2 files changed

+97
-15
lines changed

2 files changed

+97
-15
lines changed

src/cmap/connection_pool.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,35 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
362362
new ConnectionCheckOutStartedEvent(this)
363363
);
364364

365+
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
366+
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
367+
365368
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
366369

367-
const timeout = options.timeoutContext.connectionCheckoutTimeout;
370+
let timeout: Timeout | null = null;
371+
if (options?.timeout) {
372+
// CSOT enabled
373+
// Determine if we're using the timeout passed in or a new timeout
374+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
375+
// This check determines whether or not Topology.selectServer used the configured
376+
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
377+
if (
378+
options.timeout.duration === serverSelectionTimeoutMS ||
379+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
380+
) {
381+
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
382+
// here
383+
timeout = options.timeout;
384+
} else {
385+
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
386+
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
387+
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
388+
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
389+
}
390+
}
391+
} else {
392+
timeout = Timeout.expires(waitQueueTimeoutMS);
393+
}
368394

369395
const waitQueueMember: WaitQueueMember = {
370396
resolve,
@@ -393,7 +419,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
393419
: 'Timed out while checking out a connection from connection pool',
394420
this.address
395421
);
396-
if (options.timeoutContext.csotEnabled()) {
422+
if (options?.timeout) {
397423
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
398424
cause: timeoutError
399425
});
@@ -402,7 +428,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
402428
}
403429
throw error;
404430
} finally {
405-
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
431+
if (timeout !== options?.timeout) timeout?.clear();
406432
}
407433
}
408434

src/operations/execute_operation.ts

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import {
2424
} from '../sdam/server_selection';
2525
import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
27-
import { TimeoutContext } from '../timeout';
28-
import { supportsRetryableWrites } from '../utils';
27+
import { Timeout } from '../timeout';
28+
import { squashError, supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
@@ -200,10 +200,13 @@ async function tryOperation<
200200
selector = readPreference;
201201
}
202202

203-
let server = await topology.selectServer(selector, {
203+
const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
204+
operation.timeout = timeout;
205+
206+
const server = await topology.selectServer(selector, {
204207
session,
205208
operationName: operation.commandName,
206-
timeoutContext
209+
timeout
207210
});
208211

209212
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -278,14 +281,67 @@ async function tryOperation<
278281
} catch (operationError) {
279282
if (!(operationError instanceof MongoError)) throw operationError;
280283

281-
if (
282-
previousOperationError != null &&
283-
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
284-
) {
285-
throw previousOperationError;
286-
}
287-
previousServer = server.description;
288-
previousOperationError = operationError;
284+
async function retryOperation<
285+
T extends AbstractOperation<TResult>,
286+
TResult = ResultTypeFromOperation<T>
287+
>(
288+
operation: T,
289+
originalError: MongoError,
290+
{ session, topology, selector, previousServer }: RetryOptions
291+
): Promise<TResult> {
292+
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
293+
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
294+
295+
if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
296+
throw new MongoServerError({
297+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
298+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
299+
originalError
300+
});
301+
}
302+
303+
if (isWriteOperation && !isRetryableWriteError(originalError)) {
304+
throw originalError;
305+
}
306+
307+
if (isReadOperation && !isRetryableReadError(originalError)) {
308+
throw originalError;
309+
}
310+
311+
if (
312+
originalError instanceof MongoNetworkError &&
313+
session.isPinned &&
314+
!session.inTransaction() &&
315+
operation.hasAspect(Aspect.CURSOR_CREATING)
316+
) {
317+
// If we have a cursor and the initial command fails with a network error,
318+
// we can retry it on another connection. So we need to check it back in, clear the
319+
// pool for the service id, and retry again.
320+
session.unpin({ force: true, forceClear: true });
321+
}
322+
323+
// select a new server, and attempt to retry the operation
324+
const server = await topology.selectServer(selector, {
325+
session,
326+
timeout: operation.timeout,
327+
operationName: operation.commandName,
328+
previousServer
329+
});
330+
331+
if (isWriteOperation && !supportsRetryableWrites(server)) {
332+
throw new MongoUnexpectedServerResponseError(
333+
'Selected server does not support retryable writes'
334+
);
335+
}
336+
337+
try {
338+
return await operation.execute(server, session);
339+
} catch (retryError) {
340+
if (
341+
retryError instanceof MongoError &&
342+
retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
343+
) {
344+
throw originalError;
289345
}
290346
}
291347

0 commit comments

Comments
 (0)