Skip to content

Commit a58ce34

Browse files
W-A-Jamesnbbeeken
authored andcommitted
feat(NODE-6090): Implement CSOT logic for connection checkout and server selection
1 parent 5759f72 commit a58ce34

File tree

9 files changed

+43
-37
lines changed

9 files changed

+43
-37
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type TimeoutContext } from '../timeout';
32+
import { type Timeout } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -96,7 +96,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9696
directConnection?: boolean;
9797

9898
/** @internal */
99-
timeoutContext?: TimeoutContext;
99+
timeout?: Timeout;
100100
}
101101

102102
/** @public */

src/operations/command.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
136136
const options = {
137137
...this.options,
138138
...this.bsonOptions,
139-
timeoutContext,
139+
timeout: this.timeout,
140140
readPreference: this.readPreference,
141141
session
142142
};

src/operations/find.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class FindOperation extends CommandOperation<CursorResponse> {
119119
...this.bsonOptions,
120120
documentsReturnedIn: 'firstBatch',
121121
session,
122-
timeoutContext
122+
timeout: this.timeout
123123
},
124124
this.explain ? ExplainedCursorResponse : CursorResponse
125125
);

src/operations/operation.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type BSONSerializeOptions, type Document, resolveBSONOptions } from '..
22
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
33
import type { Server } from '../sdam/server';
44
import type { ClientSession } from '../sessions';
5-
import { type Timeout, type TimeoutContext } from '../timeout';
5+
import { type Timeout } from '../timeout';
66
import type { MongoDBNamespace } from '../utils';
77

88
export const Aspect = {
@@ -82,6 +82,8 @@ export abstract class AbstractOperation<TResult = any> {
8282
this.options = options;
8383
this.bypassPinningCheck = !!options.bypassPinningCheck;
8484
this.trySecondaryWrite = false;
85+
86+
this.timeoutMS = options.timeoutMS;
8587
}
8688

8789
/** Must match the first key of the command object sent to the server.

src/operations/run_command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class RunCommandOperation<T = Document> extends AbstractOperation<T> {
4747
...this.options,
4848
readPreference: this.readPreference,
4949
session,
50-
timeoutContext
50+
timeout: this.timeout
5151
},
5252
this.options.responseType
5353
);
@@ -82,7 +82,7 @@ export class RunAdminCommandOperation<T = Document> extends AbstractOperation<T>
8282
...this.options,
8383
readPreference: this.readPreference,
8484
session,
85-
timeoutContext
85+
timeout: this.timeout
8686
});
8787
return res;
8888
}

src/sdam/topology.ts

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
3838
import type { Transaction } from '../transactions';
3939
import {
4040
type Callback,
41+
csotMin,
4142
type EventEmitterWithState,
4243
HostAddress,
4344
List,
@@ -460,20 +461,13 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
460461
}
461462
}
462463

463-
const timeoutMS = this.client.s.options.timeoutMS;
464-
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
464+
const timeoutMS = this.client.options.timeoutMS;
465+
const timeout = timeoutMS != null ? Timeout.expires(timeoutMS) : undefined;
465466
const readPreference = options.readPreference ?? ReadPreference.primary;
466-
467-
const timeoutContext = TimeoutContext.create({
468-
timeoutMS,
469-
serverSelectionTimeoutMS,
470-
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471-
});
472-
473467
const selectServerOptions = {
474468
operationName: 'ping',
475-
...options,
476-
timeoutContext
469+
timeout,
470+
...options
477471
};
478472
try {
479473
const server = await this.selectServer(
@@ -483,7 +477,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
483477

484478
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
485479
if (!skipPingOnConnect && server && this.s.credentials) {
486-
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
480+
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeout });
487481
stateTransition(this, STATE_CONNECTED);
488482
this.emit(Topology.OPEN, this);
489483
this.emit(Topology.CONNECT, this);
@@ -572,10 +566,24 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
572566
new ServerSelectionStartedEvent(selector, this.description, options.operationName)
573567
);
574568
}
575-
let timeout;
576-
if (options.timeoutContext) timeout = options.timeoutContext.serverSelectionTimeout;
577-
else {
578-
timeout = Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
569+
const serverSelectionTimeoutMS = options.serverSelectionTimeoutMS ?? 0;
570+
let timeout: Timeout | null;
571+
if (options.timeout) {
572+
// CSOT Enabled
573+
if (options.timeout.duration > 0 || serverSelectionTimeoutMS > 0) {
574+
if (
575+
options.timeout.duration === serverSelectionTimeoutMS ||
576+
csotMin(options.timeout.duration, serverSelectionTimeoutMS) < serverSelectionTimeoutMS
577+
) {
578+
timeout = options.timeout;
579+
} else {
580+
timeout = Timeout.expires(serverSelectionTimeoutMS);
581+
}
582+
} else {
583+
timeout = null;
584+
}
585+
} else {
586+
timeout = Timeout.expires(serverSelectionTimeoutMS);
579587
}
580588

581589
const isSharded = this.description.type === TopologyType.Sharded;
@@ -599,7 +607,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
599607
)
600608
);
601609
}
602-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
610+
if (timeout !== options.timeout) timeout?.clear();
603611
return transaction.server;
604612
}
605613

@@ -649,7 +657,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
649657
);
650658
}
651659

652-
if (options.timeoutContext?.csotEnabled()) {
660+
if (options.timeout) {
653661
throw new MongoOperationTimeoutError('Timed out during server selection', {
654662
cause: timeoutError
655663
});
@@ -659,7 +667,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
659667
// Other server selection error
660668
throw error;
661669
} finally {
662-
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
670+
if (timeout !== options.timeout) timeout?.clear();
663671
}
664672
}
665673
/**

test/integration/client-side-operations-timeout/client_side_operations_timeout.unit.test.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,16 @@ describe('CSOT spec unit tests', function () {
3333
client = this.configuration.newClient({ timeoutMS: 1000 });
3434
// Spy on connection checkout and pull options argument
3535
const checkoutSpy = sinon.spy(ConnectionPool.prototype, 'checkOut');
36+
const selectServerSpy = sinon.spy(Topology.prototype, 'selectServer');
3637
const expiresSpy = sinon.spy(Timeout, 'expires');
3738

3839
await client.db('db').collection('collection').insertOne({ x: 1 });
3940

4041
expect(checkoutSpy).to.have.been.calledOnce;
41-
const timeoutContext = checkoutSpy.lastCall.args[0].timeoutContext;
42-
expect(timeoutContext).to.exist;
42+
expect(checkoutSpy.firstCall.args[0].timeout).to.exist;
4343
// Check that we passed through the timeout
44-
// @ts-expect-error accessing private properties
45-
expect(timeoutContext._serverSelectionTimeout).to.be.instanceOf(Timeout);
46-
// @ts-expect-error accessing private properties
47-
expect(timeoutContext._serverSelectionTimeout).to.equal(
48-
// @ts-expect-error accessing private properties
49-
timeoutContext._connectionCheckoutTimeout
44+
expect(checkoutSpy.firstCall.args[0].timeout).to.equal(
45+
selectServerSpy.lastCall.lastArg.timeout
5046
);
5147

5248
// Check that no more Timeouts are constructed after we enter checkout

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ describe('CSOT driver tests', () => {
143143
});
144144

145145
it('throws a MongoOperationTimeoutError', {
146-
metadata: { requires: { mongodb: '>=4.4', topology: '!load-balanced' } },
146+
metadata: { requires: { mongodb: '>=4.4' } },
147147
test: async function () {
148148
const commandsStarted = [];
149149
client = this.configuration.newClient(undefined, { timeoutMS: 1, monitorCommands: true });

test/unit/cmap/connection_pool.test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ describe('Connection Pool', function () {
129129

130130
pool.ready();
131131

132-
const conn = await pool.checkOut({ timeoutContext });
133-
const err = await pool.checkOut({ timeoutContext }).catch(e => e);
132+
const conn = await pool.checkOut();
133+
const err = await pool.checkOut().catch(e => e);
134134
expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError);
135135
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
136136
pool.checkIn(conn);

0 commit comments

Comments
 (0)