Skip to content

Commit 4d2a0c8

Browse files
W-A-Jamesnbbeeken
authored andcommitted
feat(NODE-6090): Implement CSOT logic for connection checkout and server selection
1 parent 58d4e7b commit 4d2a0c8

File tree

11 files changed

+143
-55
lines changed

11 files changed

+143
-55
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type TimeoutContext } from '../timeout';
32+
import { type Timeout } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9696
directConnection?: boolean;
9797

9898
/** @internal */
99-
timeoutContext?: TimeoutContext;
99+
timeout?: Timeout;
100100
}
101101

102102
/** @public */

src/cmap/connection_pool.ts

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import {
2727
} from '../error';
2828
import { CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import type { Server } from '../sdam/server';
30-
import { type TimeoutContext, TimeoutError } from '../timeout';
31-
import { type Callback, List, makeCounter, promiseWithResolvers } from '../utils';
30+
import { Timeout, TimeoutError } from '../timeout';
31+
import { type Callback, csotMin, List, makeCounter, promiseWithResolvers } from '../utils';
3232
import { connect } from './connect';
3333
import { Connection, type ConnectionEvents, type ConnectionOptions } from './connection';
3434
import {
@@ -354,15 +354,41 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
354354
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
355355
* explicitly destroyed by the new owner.
356356
*/
357-
async checkOut(options: { timeoutContext: TimeoutContext }): Promise<Connection> {
357+
async checkOut(options?: { timeout?: Timeout }): Promise<Connection> {
358358
this.emitAndLog(
359359
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
360360
new ConnectionCheckOutStartedEvent(this)
361361
);
362362

363+
const waitQueueTimeoutMS = this.options.waitQueueTimeoutMS;
364+
const serverSelectionTimeoutMS = this[kServer].topology.s.serverSelectionTimeoutMS;
365+
363366
const { promise, resolve, reject } = promiseWithResolvers<Connection>();
364367

365-
const timeout = options.timeoutContext.connectionCheckoutTimeout;
368+
let timeout: Timeout | null = null;
369+
if (options?.timeout) {
370+
// CSOT enabled
371+
// Determine if we're using the timeout passed in or a new timeout
372+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
373+
// This check determines whether or not Topology.selectServer used the configured
374+
// `timeoutMS` or `serverSelectionTimeoutMS` value for its timeout
375+
if (
376+
options.timeout.duration === serverSelectionTimeoutMS ||
377+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
378+
) {
379+
// server selection used `timeoutMS`, so we should use the existing timeout as the timeout
380+
// here
381+
timeout = options.timeout;
382+
} else {
383+
// server selection used `serverSelectionTimeoutMS`, so we construct a new timeout with
384+
// the time remaining to ensure that Topology.selectServer and ConnectionPool.checkOut
385+
// cumulatively don't spend more than `serverSelectionTimeoutMS` blocking
386+
timeout = Timeout.expires(serverSelectionTimeoutMS - options.timeout.timeElapsed);
387+
}
388+
}
389+
} else {
390+
timeout = Timeout.expires(waitQueueTimeoutMS);
391+
}
366392

367393
const waitQueueMember: WaitQueueMember = {
368394
resolve,
@@ -390,7 +416,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
390416
: 'Timed out while checking out a connection from connection pool',
391417
this.address
392418
);
393-
if (options.timeoutContext.csotEnabled()) {
419+
if (options?.timeout) {
394420
throw new MongoOperationTimeoutError('Timed out during connection checkout', {
395421
cause: timeoutError
396422
});
@@ -399,7 +425,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
399425
}
400426
throw error;
401427
} finally {
402-
if (options.timeoutContext.clearConnectionCheckoutTimeout) timeout?.clear();
428+
if (timeout !== options?.timeout) timeout?.clear();
403429
}
404430
}
405431

src/operations/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
136136
const options = {
137137
...this.options,
138138
...this.bsonOptions,
139-
timeoutContext,
139+
timeout: this.timeout,
140140
readPreference: this.readPreference,
141141
session
142142
};

src/operations/execute_operation.ts

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import {
2424
} from '../sdam/server_selection';
2525
import type { Topology } from '../sdam/topology';
2626
import type { ClientSession } from '../sessions';
27-
import { TimeoutContext } from '../timeout';
28-
import { supportsRetryableWrites } from '../utils';
27+
import { Timeout } from '../timeout';
28+
import { squashError, supportsRetryableWrites } from '../utils';
2929
import { AbstractOperation, Aspect } from './operation';
3030

3131
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
@@ -200,10 +200,13 @@ async function tryOperation<
200200
selector = readPreference;
201201
}
202202

203-
let server = await topology.selectServer(selector, {
203+
const timeout = operation.timeoutMS != null ? Timeout.expires(operation.timeoutMS) : undefined;
204+
operation.timeout = timeout;
205+
206+
const server = await topology.selectServer(selector, {
204207
session,
205208
operationName: operation.commandName,
206-
timeoutContext
209+
timeout
207210
});
208211

209212
const hasReadAspect = operation.hasAspect(Aspect.READ_OPERATION);
@@ -278,14 +281,67 @@ async function tryOperation<
278281
} catch (operationError) {
279282
if (!(operationError instanceof MongoError)) throw operationError;
280283

281-
if (
282-
previousOperationError != null &&
283-
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
284-
) {
285-
throw previousOperationError;
286-
}
287-
previousServer = server.description;
288-
previousOperationError = operationError;
284+
async function retryOperation<
285+
T extends AbstractOperation<TResult>,
286+
TResult = ResultTypeFromOperation<T>
287+
>(
288+
operation: T,
289+
originalError: MongoError,
290+
{ session, topology, selector, previousServer }: RetryOptions
291+
): Promise<TResult> {
292+
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
293+
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
294+
295+
if (isWriteOperation && originalError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
296+
throw new MongoServerError({
297+
message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
298+
errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
299+
originalError
300+
});
301+
}
302+
303+
if (isWriteOperation && !isRetryableWriteError(originalError)) {
304+
throw originalError;
305+
}
306+
307+
if (isReadOperation && !isRetryableReadError(originalError)) {
308+
throw originalError;
309+
}
310+
311+
if (
312+
originalError instanceof MongoNetworkError &&
313+
session.isPinned &&
314+
!session.inTransaction() &&
315+
operation.hasAspect(Aspect.CURSOR_CREATING)
316+
) {
317+
// If we have a cursor and the initial command fails with a network error,
318+
// we can retry it on another connection. So we need to check it back in, clear the
319+
// pool for the service id, and retry again.
320+
session.unpin({ force: true, forceClear: true });
321+
}
322+
323+
// select a new server, and attempt to retry the operation
324+
const server = await topology.selectServer(selector, {
325+
session,
326+
timeout: operation.timeout,
327+
operationName: operation.commandName,
328+
previousServer
329+
});
330+
331+
if (isWriteOperation && !supportsRetryableWrites(server)) {
332+
throw new MongoUnexpectedServerResponseError(
333+
'Selected server does not support retryable writes'
334+
);
335+
}
336+
337+
try {
338+
return await operation.execute(server, session);
339+
} catch (retryError) {
340+
if (
341+
retryError instanceof MongoError &&
342+
retryError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
343+
) {
344+
throw originalError;
289345
}
290346
}
291347

src/operations/find.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
119119
...this.bsonOptions,
120120
documentsReturnedIn: 'firstBatch',
121121
session,
122-
timeoutContext
122+
timeout: this.timeout
123123
},
124124
this.explain ? ExplainedCursorResponse : CursorResponse
125125
);

src/operations/operation.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5-
import { type Timeout, type TimeoutContext } from '../timeout';
5+
import { type Timeout } from '../timeout';
66
import type { MongoDBNamespace } from '../utils';
77

88
export const Aspect = {
@@ -82,6 +82,8 @@ export abstract class AbstractOperation<TResult = any> {
8282
this.options = options;
8383
this.bypassPinningCheck = !!options.bypassPinningCheck;
8484
this.trySecondaryWrite = false;
85+
86+
this.timeoutMS = options.timeoutMS;
8587
}
8688

8789
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
4747
...this.options,
4848
readPreference: this.readPreference,
4949
session,
50-
timeoutContext
50+
timeout: this.timeout
5151
},
5252
this.options.responseType
5353
);
@@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
8282
...this.options,
8383
readPreference: this.readPreference,
8484
session,
85-
timeoutContext
85+
timeout: this.timeout
8686
});
8787
return res;
8888
}

src/sdam/topology.ts

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
3838
import type { Transaction } from '../transactions';
3939
import {
4040
type Callback,
41+
csotMin,
4142
type EventEmitterWithState,
4243
HostAddress,
4344
List,
@@ -460,20 +461,13 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
460461
}
461462
}
462463

463-
const timeoutMS = this.client.s.options.timeoutMS;
464-
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
464+
const timeoutMS = this.client.options.timeoutMS;
465+
const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined;
465466
const readPreference = options.readPreference ?? ReadPreference.primary;
466-
467-
const timeoutContext = TimeoutContext.create({
468-
timeoutMS,
469-
serverSelectionTimeoutMS,
470-
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471-
});
472-
473467
const selectServerOptions = {
474468
operationName: 'ping',
475-
...options,
476-
timeoutContext
469+
timeout,
470+
...options
477471
};
478472
try {
479473
const server = await this.selectServer(
@@ -483,7 +477,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
483477

484478
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
485479
if (!skipPingOnConnect && server && this.s.credentials) {
486-
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
480+
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout });
487481
stateTransition(this, STATE_CONNECTED);
488482
this.emit(Topology.OPEN, this);
489483
this.emit(Topology.CONNECT, this);
@@ -572,10 +566,24 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
572566
new ServerSelectionStartedEvent(selector, this.description, options.operationName)
573567
);
574568
}
575-
let timeout;
576-
if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout;
577-
else {
578-
timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
569+
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0;
570+
let timeout: Timeout | null;
571+
if (options.timeout) {
572+
// CSOT Enabled
573+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
574+
if (
575+
options.timeout.duration === serverSelectionTimeoutMS ||
576+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
577+
) {
578+
timeout = options.timeout;
579+
} else {
580+
timeout = Timeout.expires(serverSelectionTimeoutMS);
581+
}
582+
} else {
583+
timeout = null;
584+
}
585+
} else {
586+
timeout = Timeout.expires(serverSelectionTimeoutMS);
579587
}
580588

581589
const isSharded = this.description.type === TopologyType.Sharded;
@@ -599,7 +607,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
599607
)
600608
);
601609
}
602-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
610+
if (timeout !== options.timeout) timeout?.clear();
603611
return transaction.server;
604612
}
605613

@@ -649,7 +657,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
649657
);
650658
}
651659

652-
if (options.timeoutContext?.csotEnabled()) {
660+
if (options.timeout) {
653661
throw new MongoOperationTimeoutError('Timed out during server selection', {
654662
cause: timeoutError
655663
});
@@ -659,7 +667,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
659667
// Other server selection error
660668
throw error;
661669
} finally {
662-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
670+
if (timeout !== options.timeout) timeout?.clear();
663671
}
664672
}
665673
/**

test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,16 @@ describe('CSOT spec unit tests', function () {
3333
client = this.configuration.newClient({ timeoutMS: 1000 });
3434
// Spy on connection checkout and pull options argument
3535
const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut');
36+
const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer');
3637
const expiresSpy = sinon.spy(Timeout, 'expires');
3738

3839
await client.db('db').collection('collection').insertOne({ x: 1 });
3940

4041
expect(checkoutSpy).to.have.been.calledOnce;
41-
const timeoutContext = checkoutSpy.lastCall.args[0].timeoutContext;
42-
expect(timeoutContext).to.exist;
42+
expect(checkoutSpy.firstCall.args[0].timeout).to.exist;
4343
// Check that we passed through the timeout
44-
// @ts-expect-error accessing private properties
45-
expect(timeoutContext._serverSelectionTimeout).to.be.instanceOf(Timeout);
46-
// @ts-expect-error accessing private properties
47-
expect(timeoutContext._serverSelectionTimeout).to.equal(
48-
// @ts-expect-error accessing private properties
49-
timeoutContext._connectionCheckoutTimeout
44+
expect(checkoutSpy.firstCall.args[0].timeout).to.equal(
45+
selectServerSpy.lastCall.lastArg.timeout
5046
);
5147

5248
// Check that no more Timeouts are constructed after we enter checkout

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ describe('CSOT driver tests', () => {
143143
});
144144

145145
it('throws a MongoOperationTimeoutError', {
146-
metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } },
146+
metadata: { requires: { mongodb: '>=4.4' } },
147147
test: async function () {
148148
const commandsStarted = [];
149149
client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true });

test/unit/cmap/connection_pool.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ describe('Connection Pool', function () {
129129

130130
pool.ready();
131131

132-
const conn = await pool.checkOut({ timeoutContext });
133-
const err = await pool.checkOut({ timeoutContext }).catch(e => e);
132+
const conn = await pool.checkOut();
133+
const err = await pool.checkOut().catch(e => e);
134134
expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError);
135135
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
136136
pool.checkIn(conn);

0 commit comments

Comments
 (0)