Skip to content

Commit dca1a9c

Browse files
committed
WIP maxTimeMS
1 parent e5b80ec commit dca1a9c

File tree

13 files changed

+158
-75
lines changed

13 files changed

+158
-75
lines changed

src/cmap/command_monitoring_events.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export class CommandStartedEvent {
4646
command: WriteProtocolMessageType,
4747
serverConnectionId: bigint | null
4848
) {
49+
Error.captureStackTrace(this);
4950
const cmd = extractCommand(command);
5051
const commandName = extractCommandName(cmd);
5152
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
@@ -110,6 +111,7 @@ export class CommandSucceededEvent {
110111
started: number,
111112
serverConnectionId: bigint | null
112113
) {
114+
Error.captureStackTrace(this);
113115
const cmd = extractCommand(command);
114116
const commandName = extractCommandName(cmd);
115117
const { address, connectionId, serviceId } = extractConnectionDetails(connection);
@@ -130,6 +132,8 @@ export class CommandSucceededEvent {
130132
}
131133
}
132134

135+
Error.stackTraceLimit = 1000;
136+
133137
/**
134138
* An event indicating the failure of a given command
135139
* @public
@@ -168,6 +172,7 @@ export class CommandFailedEvent {
168172
started: number,
169173
serverConnectionId: bigint | null
170174
) {
175+
Error.captureStackTrace(this);
171176
const cmd = extractCommand(command);
172177
const commandName = extractCommandName(cmd);
173178
const { address, connectionId, serviceId } = extractConnectionDetails(connection);

src/cmap/connection.ts

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type TimeoutContext } from '../timeout';
32+
import { Timeout, type TimeoutContext, TimeoutError } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -416,6 +416,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
416416
...options
417417
};
418418

419+
if (options.timeoutContext?.csotEnabled()) {
420+
const { maxTimeMS } = options.timeoutContext;
421+
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
422+
}
423+
419424
const message = this.supportsOpMsg
420425
? new OpMsgRequest(db, cmd, commandOptions)
421426
: new OpQueryRequest(db, cmd, commandOptions);
@@ -430,7 +435,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
430435
): AsyncGenerator<MongoDBResponse> {
431436
this.throwIfAborted();
432437

433-
if (typeof options.socketTimeoutMS === 'number') {
438+
if (options.timeoutContext?.csotEnabled()) {
439+
this.socket.setTimeout(0);
440+
} else if (typeof options.socketTimeoutMS === 'number') {
434441
this.socket.setTimeout(options.socketTimeoutMS);
435442
} else if (this.socketTimeoutMS !== 0) {
436443
this.socket.setTimeout(this.socketTimeoutMS);
@@ -439,7 +446,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
439446
try {
440447
await this.writeCommand(message, {
441448
agreedCompressor: this.description.compressor ?? 'none',
442-
zlibCompressionLevel: this.description.zlibCompressionLevel
449+
zlibCompressionLevel: this.description.zlibCompressionLevel,
450+
timeoutContext: options.timeoutContext
443451
});
444452

445453
if (options.noResponse) {
@@ -449,7 +457,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
449457

450458
this.throwIfAborted();
451459

452-
for await (const response of this.readMany()) {
460+
if (
461+
options.timeoutContext?.csotEnabled() &&
462+
options.timeoutContext.minRoundTripTime != null &&
463+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
464+
) {
465+
throw new TimeoutError('Server roundtrip time is greater than the time remaining');
466+
}
467+
468+
for await (const response of this.readMany({ timeoutContext: options.timeoutContext })) {
453469
this.socket.setTimeout(0);
454470
const bson = response.parse();
455471

@@ -622,7 +638,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
622638
*/
623639
private async writeCommand(
624640
command: WriteProtocolMessageType,
625-
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
641+
options: {
642+
agreedCompressor?: CompressorName;
643+
zlibCompressionLevel?: number;
644+
timeoutContext?: TimeoutContext;
645+
}
626646
): Promise<void> {
627647
const finalCommand =
628648
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
@@ -634,8 +654,23 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
634654

635655
const buffer = Buffer.concat(await finalCommand.toBin());
636656

657+
if (options.timeoutContext?.csotEnabled()) {
658+
if (
659+
options.timeoutContext.minRoundTripTime != null &&
660+
options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime
661+
) {
662+
throw new TimeoutError('Server roundtrip time is greater than the time remaining');
663+
}
664+
}
665+
637666
if (this.socket.write(buffer)) return;
638-
return await once(this.socket, 'drain');
667+
668+
const drainEvent = once<void>(this.socket, 'drain');
669+
if (options.timeoutContext?.csotEnabled()) {
670+
const timeout = Timeout.expires(options.timeoutContext.remainingTimeMS);
671+
return await Promise.race([drainEvent, timeout]);
672+
}
673+
return await drainEvent;
639674
}
640675

641676
/**
@@ -647,9 +682,16 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
647682
*
648683
* Note that `for-await` loops call `return` automatically when the loop is exited.
649684
*/
650-
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
685+
private async *readMany(options: {
686+
timeoutContext?: TimeoutContext;
687+
}): AsyncGenerator<OpMsgResponse | OpReply> {
651688
try {
652-
this.dataEvents = onData(this.messageStream);
689+
const timeoutMS = options.timeoutContext?.csotEnabled()
690+
? options.timeoutContext.remainingTimeMS
691+
: 0;
692+
693+
this.dataEvents = onData(this.messageStream, { timeoutMS });
694+
653695
for await (const message of this.dataEvents) {
654696
const response = await decompressResponse(message);
655697
yield response;

src/cmap/wire_protocol/on_data.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type EventEmitter } from 'events';
22

3+
import { Timeout } from '../../timeout';
34
import { List, promiseWithResolvers } from '../../utils';
45

56
/**
@@ -18,7 +19,7 @@ type PendingPromises = Omit<
1819
* Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
1920
* It will reject upon an error event.
2021
*/
21-
export function onData(emitter: EventEmitter) {
22+
export function onData(emitter: EventEmitter, { timeoutMS }: { timeoutMS: number }) {
2223
// Setup pending events and pending promise lists
2324
/**
2425
* When the caller has not yet called .next(), we store the
@@ -87,6 +88,10 @@ export function onData(emitter: EventEmitter) {
8788
emitter.on('data', eventHandler);
8889
emitter.on('error', errorHandler);
8990

91+
if (timeoutMS > 0 && Number.isFinite(timeoutMS))
92+
// eslint-disable-next-line github/no-then
93+
Timeout.expires(timeoutMS).then(undefined, errorHandler);
94+
9095
return iterator;
9196

9297
function eventHandler(value: Buffer) {

src/sdam/topology.ts

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -460,29 +460,26 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
460460
}
461461
}
462462

463-
const timeoutMS = this.client.s.options.timeoutMS;
464-
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
465-
const readPreference = options.readPreference ?? ReadPreference.primary;
466-
467-
const timeoutContext = TimeoutContext.create({
468-
timeoutMS,
469-
serverSelectionTimeoutMS,
470-
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
471-
});
472-
473-
const selectServerOptions = {
474-
operationName: 'ping',
475-
...options,
476-
timeoutContext
477-
};
478463
try {
479-
const server = await this.selectServer(
480-
readPreferenceServerSelector(readPreference),
481-
selectServerOptions
482-
);
483-
484464
const skipPingOnConnect = this.s.options[Symbol.for('@@mdb.skipPingOnConnect')] === true;
485-
if (!skipPingOnConnect && server && this.s.credentials) {
465+
if (!skipPingOnConnect && this.s.credentials) {
466+
const timeoutMS = this.client.s.options.timeoutMS;
467+
const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
468+
const readPreference = options.readPreference ?? ReadPreference.primary;
469+
const timeoutContext = TimeoutContext.create({
470+
timeoutMS,
471+
serverSelectionTimeoutMS,
472+
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
473+
});
474+
const selectServerOptions = {
475+
operationName: 'ping',
476+
...options,
477+
timeoutContext
478+
};
479+
const server = await this.selectServer(
480+
readPreferenceServerSelector(readPreference),
481+
selectServerOptions
482+
);
486483
await server.command(ns('admin.$cmd'), { ping: 1 }, { timeoutContext });
487484
stateTransition(this, STATE_CONNECTED);
488485
this.emit(Topology.OPEN, this);
@@ -623,7 +620,11 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
623620

624621
try {
625622
timeout?.throwIfExpired();
626-
return await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
623+
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
624+
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
625+
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
626+
}
627+
return server;
627628
} catch (error) {
628629
if (TimeoutError.is(error)) {
629630
// Timeout

src/timeout.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class Timeout extends Promise<never> {
5151
}
5252

5353
/** Create a new timeout that expires in `duration` ms */
54-
private constructor(executor: Executor = () => null, duration: number, unref = false) {
54+
private constructor(executor: Executor = () => null, duration: number, unref = true) {
5555
let reject!: Reject;
5656

5757
if (duration < 0) {
@@ -150,6 +150,11 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions
150150

151151
/** @internal */
152152
export abstract class TimeoutContext {
153+
start: number;
154+
constructor() {
155+
this.start = Math.trunc(performance.now());
156+
}
157+
153158
static create(options: TimeoutContextOptions): TimeoutContext {
154159
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
155160
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);
@@ -180,6 +185,7 @@ export class CSOTTimeoutContext extends TimeoutContext {
180185

181186
private _serverSelectionTimeout?: Timeout | null;
182187
private _connectionCheckoutTimeout?: Timeout | null;
188+
public minRoundTripTime = 0;
183189

184190
constructor(options: CSOTTimeoutContextOptions) {
185191
super();
@@ -194,11 +200,12 @@ export class CSOTTimeoutContext extends TimeoutContext {
194200
}
195201

196202
get maxTimeMS(): number {
197-
return this._maxTimeMS ?? -1;
203+
return this.remainingTimeMS - this.minRoundTripTime;
198204
}
199205

200-
set maxTimeMS(v: number) {
201-
this._maxTimeMS = v;
206+
get remainingTimeMS() {
207+
const timePassed = Math.trunc(performance.now()) - this.start;
208+
return this.timeoutMS <= 0 ? Infinity : this.timeoutMS - timePassed;
202209
}
203210

204211
csotEnabled(): this is CSOTTimeoutContext {

test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,40 @@ import { join } from 'path';
33
import { loadSpecTests } from '../../spec';
44
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
55

6-
// TODO(NODE-5823): Implement unified runner operations and options support for CSOT
7-
describe.skip('CSOT spec tests', function () {
8-
runUnifiedSuite(loadSpecTests(join('client-side-operations-timeout')));
6+
const enabled = [
7+
'override-collection-timeoutMS',
8+
'override-database-timeoutMS',
9+
'override-operation-timeoutMS'
10+
];
11+
12+
const cursorOperations = [
13+
'aggregate',
14+
'countDocuments',
15+
'listIndexes',
16+
'createChangeStream',
17+
'listCollections',
18+
'listCollectionNames'
19+
];
20+
21+
describe('CSOT spec tests', function () {
22+
const specs = loadSpecTests(join('client-side-operations-timeout'));
23+
for (const spec of specs) {
24+
for (const test of spec.tests) {
25+
// not one of the test suites listed in kickoff
26+
if (!enabled.includes(spec.name)) test.skipReason = 'Not working yet';
27+
28+
// Cursor operation
29+
if (test.operations.find(operation => cursorOperations.includes(operation.name)))
30+
test.skipReason = 'Not working yet';
31+
32+
// runCommand only uses options directly passed to it
33+
if (
34+
test.operations.find(
35+
operation => operation.name === 'runCommand' && operation.arguments.timeoutMS == null
36+
)
37+
)
38+
test.skipReason = 'Not working yet';
39+
}
40+
}
41+
runUnifiedSuite(specs);
942
});

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ describe('CSOT driver tests', () => {
4848
afterEach(async () => {
4949
await cursor?.close();
5050
await session?.endSession();
51-
await session.endSession();
5251
});
5352

5453
it('throws an error', async () => {

test/integration/node-specific/db.test.js

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,12 @@ describe('Db', function () {
4545
});
4646
});
4747

48-
it('shouldCorrectlyHandleFailedConnection', {
49-
metadata: {
50-
requires: { topology: ['single', 'replicaset', 'sharded'] }
51-
},
52-
53-
test: function (done) {
54-
var configuration = this.configuration;
55-
var fs_client = configuration.newClient('mongodb://127.0.0.1:25117/test', {
56-
serverSelectionTimeoutMS: 10
57-
});
58-
59-
fs_client.connect(function (err) {
60-
test.ok(err != null);
61-
done();
62-
});
63-
}
48+
it('should correctly handle failed connection', async function () {
49+
const client = this.configuration.newClient('mongodb://iLoveJS', {
50+
serverSelectionTimeoutMS: 10
51+
});
52+
const error = await client.connect().catch(error => error);
53+
expect(error).to.be.instanceOf(Error);
6454
});
6555

6656
it('shouldCorrectlyGetErrorDroppingNonExistingDb', {

0 commit comments

Comments
 (0)