Skip to content

Commit 9ebf3dd

Browse files
committed
Start CSOT impl
1 parent 2072985 commit 9ebf3dd

File tree

5 files changed

+59
-11
lines changed

5 files changed

+59
-11
lines changed

src/bulk/common.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../o
2020
import type { Server } from '../sdam/server';
2121
import type { Topology } from '../sdam/topology';
2222
import type { ClientSession } from '../sessions';
23+
import { type Timeout } from '../timeout';
2324
import { maybeAddIdToDocuments } from '../utils';
2425
import {
2526
applyRetryableWrites,
@@ -874,6 +875,8 @@ export interface BulkWriteOptions extends CommandOperationOptions {
874875
forceServerObjectId?: boolean;
875876
/** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
876877
let?: Document;
878+
/** @internal */
879+
timeout?: Timeout | null;
877880
}
878881

879882
const executeCommandsAsync = promisify(executeCommands);

src/cmap/connection.ts

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
2929
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
3030
import { ServerType } from '../sdam/common';
3131
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
32-
import { type Timeout } from '../timeout';
32+
import { Timeout } from '../timeout';
3333
import {
3434
BufferPool,
3535
calculateDurationInMs,
@@ -59,6 +59,7 @@ import {
5959
type WriteProtocolMessageType
6060
} from './commands';
6161
import type { Stream } from './connect';
62+
import { type ConnectionPool } from './connection_pool';
6263
import type { ClientMetadata } from './handshake/client_metadata';
6364
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
6465
import { type CompressorName, decompressResponse } from './wire_protocol/compression';
@@ -183,6 +184,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
183184
* Once connection is established, command logging can log events (if enabled)
184185
*/
185186
public established: boolean;
187+
public pool?: ConnectionPool;
186188
/** Indicates that the connection (including underlying TCP socket) has been closed. */
187189
public closed = false;
188190

@@ -279,6 +281,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
279281
);
280282
}
281283

284+
private get minRoundTripTime(): number {
285+
return this.pool?.server.description.minRoundTripTime ?? 0;
286+
}
287+
282288
public markAvailable(): void {
283289
this.lastUseTime = now();
284290
}
@@ -343,6 +349,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
343349

344350
let clusterTime = this.clusterTime;
345351

352+
if (Timeout.is(options.timeout) && options.timeout.duration > 0) {
353+
cmd.maxTimeMS = options.timeout.getMaxTimeMS(this.minRoundTripTime);
354+
}
355+
346356
if (this.serverApi) {
347357
const { version, strict, deprecationErrors } = this.serverApi;
348358
cmd.apiVersion = version;
@@ -432,7 +442,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
432442
try {
433443
await this.writeCommand(message, {
434444
agreedCompressor: this.description.compressor ?? 'none',
435-
zlibCompressionLevel: this.description.zlibCompressionLevel
445+
zlibCompressionLevel: this.description.zlibCompressionLevel,
446+
timeout: options.timeout
436447
});
437448

438449
if (options.noResponse) {
@@ -442,7 +453,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
442453

443454
this.throwIfAborted();
444455

445-
for await (const response of this.readMany()) {
456+
for await (const response of this.readMany({ timeout: options.timeout })) {
446457
this.socket.setTimeout(0);
447458
const bson = response.parse();
448459

@@ -635,7 +646,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
635646
*/
636647
private async writeCommand(
637648
command: WriteProtocolMessageType,
638-
options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number }
649+
options: {
650+
agreedCompressor?: CompressorName;
651+
zlibCompressionLevel?: number;
652+
timeout?: Timeout | null;
653+
}
639654
): Promise<void> {
640655
const finalCommand =
641656
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
@@ -647,8 +662,15 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
647662

648663
const buffer = Buffer.concat(await finalCommand.toBin());
649664

650-
if (this.socket.write(buffer)) return;
651-
return await once(this.socket, 'drain');
665+
if (this.socket.write(buffer)) {
666+
return;
667+
}
668+
const drain = once(this.socket, 'drain');
669+
670+
if (options.timeout) {
671+
await Promise.race([drain, options.timeout]);
672+
}
673+
await drain;
652674
}
653675

654676
/**
@@ -660,9 +682,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
660682
*
661683
* Note that `for-await` loops call `return` automatically when the loop is exited.
662684
*/
663-
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
685+
private async *readMany(options: {
686+
timeout?: Timeout | null;
687+
}): AsyncGenerator<OpMsgResponse | OpReply> {
664688
try {
665-
this.dataEvents = onData(this.messageStream);
689+
this.dataEvents = onData(this.messageStream, options);
666690
for await (const message of this.dataEvents) {
667691
const response = await decompressResponse(message);
668692
yield response;

src/cmap/connection_pool.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
168168
[kMetrics]: ConnectionPoolMetrics;
169169
[kProcessingWaitQueue]: boolean;
170170

171+
server: Server;
172+
171173
/**
172174
* Emitted when the connection pool is created.
173175
* @event
@@ -247,6 +249,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
247249

248250
this[kPoolState] = PoolState.paused;
249251
this[kServer] = server;
252+
this.server = server;
250253
this[kConnections] = new List();
251254
this[kPending] = 0;
252255
this[kCheckedOut] = new Set();

src/operations/execute_operation.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ export async function executeOperation<
187187
}
188188

189189
try {
190+
if (operation.timeout) {
191+
return await Promise.race([operation.execute(server, session), operation.timeout]);
192+
}
190193
return await operation.execute(server, session);
191194
} catch (operationError) {
192195
if (willRetry && operationError instanceof MongoError) {
@@ -271,6 +274,9 @@ async function retryOperation<
271274
}
272275

273276
try {
277+
if (operation.timeout) {
278+
return await Promise.race([operation.execute(server, session), operation.timeout]);
279+
}
274280
return await operation.execute(server, session);
275281
} catch (retryError) {
276282
if (

src/timeout.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ export class CSOTError extends MongoError {
2424
}
2525
}
2626

27-
/** @internal */
27+
/** @internal
28+
* This class is an abstraction over CSOT timeouts, implementing the specification outlined in
29+
* https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/client-side-operations-timeout.md
30+
* */
2831
export class Timeout extends Promise<never> {
2932
get [Symbol.toStringTag](): 'MongoDBTimeout' {
3033
return 'MongoDBTimeout';
@@ -39,6 +42,9 @@ export class Timeout extends Promise<never> {
3942
public duration: number;
4043
public timedOut = false;
4144

45+
/**
46+
* Return the amount of time remaining until a CSOTError is thrown
47+
* */
4248
public get remainingTime(): number {
4349
if (this.duration === 0) return Infinity;
4450
if (this.timedOut) return 0;
@@ -59,13 +65,13 @@ export class Timeout extends Promise<never> {
5965
executor(noop, promiseReject);
6066
});
6167

62-
// Construct timeout error at point of Timeout instantiation to preserve stack traces
68+
// NOTE: Construct timeout error at point of Timeout instantiation to preserve stack traces
6369
this.timeoutError = new CSOTError('Timeout!');
6470

6571
this.expireTimeout = () => {
6672
this.ended = Math.trunc(performance.now());
6773
this.timedOut = true;
68-
// Wrap error here: Why?
74+
// NOTE: Wrap error here: Why?
6975
reject(CSOTError.from(this.timeoutError));
7076
};
7177

@@ -108,7 +114,12 @@ export class Timeout extends Promise<never> {
108114
}
109115
}
110116

117+
/**
118+
* Implement maxTimeMS calculation detailed in https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/client-side-operations-timeout.md#command-execution
119+
* */
111120
public getMaxTimeMS(minRoundTripTime: number): any {
121+
console.log(`remaining time: ${this.remainingTime}, minRTT: ${minRoundTripTime}`);
122+
if (!Number.isFinite(this.remainingTime)) return 0;
112123
if (minRoundTripTime < this.remainingTime) return this.remainingTime - minRoundTripTime;
113124
throw CSOTError.from(this.timeoutError);
114125
}
@@ -118,6 +129,7 @@ export class Timeout extends Promise<never> {
118129
return Timeout.expires(this.duration);
119130
}
120131

132+
/** Create a new timeout that expires in `duration` ms */
121133
public static expires(duration: number): Timeout {
122134
return new Timeout(undefined, duration);
123135
}

0 commit comments

Comments
 (0)