Skip to content

Commit c6f4bcf

Browse files
fix test and lint
1 parent 7a12914 commit c6f4bcf

File tree

9 files changed

+318
-60
lines changed

9 files changed

+318
-60
lines changed

src/cmap/connection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,9 +422,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
422422
...options
423423
};
424424

425-
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
426-
const { maxTimeMS } = options.timeoutContext;
427-
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
425+
if (!options.omitMaxTimeMS) {
426+
const maxTimeMS = options.timeoutContext?.maxTimeMS;
427+
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
428428
}
429429

430430
const message = this.supportsOpMsg

src/cursor/abstract_cursor.ts

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { TimeoutContext } from '../timeout';
24+
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
2525
import { type MongoDBNamespace, squashError } from '../utils';
2626

2727
/**
@@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
119119
timeoutMS?: number;
120120
/** @internal TODO(NODE-5688): make this public */
121121
timeoutMode?: CursorTimeoutMode;
122+
123+
/**
124+
* @internal
125+
*
126+
* A timeout context to govern the total time the cursor can live. If provided, the cursor
127+
* cannot be used in ITERATION mode.
128+
*/
129+
timeoutContext?: CursorTimeoutContext;
122130
}
123131

124132
/** @internal */
@@ -171,7 +179,7 @@ export abstract class AbstractCursor<
171179
/** @internal */
172180
protected readonly cursorOptions: InternalAbstractCursorOptions;
173181
/** @internal */
174-
protected timeoutContext?: TimeoutContext;
182+
protected timeoutContext?: CursorTimeoutContext;
175183

176184
/** @event */
177185
static readonly CLOSE = 'close' as const;
@@ -205,20 +213,12 @@ export abstract class AbstractCursor<
205213
};
206214
this.cursorOptions.timeoutMS = options.timeoutMS;
207215
if (this.cursorOptions.timeoutMS != null) {
208-
if (options.timeoutMode == null) {
209-
if (options.tailable) {
210-
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
211-
} else {
212-
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
213-
}
214-
} else {
215-
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
216-
throw new MongoInvalidArgumentError(
217-
"Cannot set tailable cursor's timeoutMode to LIFETIME"
218-
);
219-
}
220-
this.cursorOptions.timeoutMode = options.timeoutMode;
216+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
217+
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
221218
}
219+
this.cursorOptions.timeoutMode =
220+
options.timeoutMode ??
221+
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
222222
} else {
223223
if (options.timeoutMode != null)
224224
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
@@ -264,6 +264,17 @@ export abstract class AbstractCursor<
264264
utf8: options?.enableUtf8Validation === false ? false : true
265265
}
266266
};
267+
268+
if (
269+
options.timeoutContext != null &&
270+
options.timeoutMS != null &&
271+
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
272+
) {
273+
throw new MongoAPIError(
274+
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
275+
);
276+
}
277+
this.timeoutContext = options.timeoutContext;
267278
}
268279

269280
/**
@@ -721,6 +732,9 @@ export abstract class AbstractCursor<
721732
* if the resultant data has already been retrieved by this cursor.
722733
*/
723734
rewind(): void {
735+
if (this.timeoutContext && this.timeoutContext.owner !== this) {
736+
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
737+
}
724738
if (!this.initialized) {
725739
return;
726740
}
@@ -790,10 +804,13 @@ export abstract class AbstractCursor<
790804
*/
791805
private async cursorInit(): Promise<void> {
792806
if (this.cursorOptions.timeoutMS != null) {
793-
this.timeoutContext = TimeoutContext.create({
794-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
795-
timeoutMS: this.cursorOptions.timeoutMS
796-
});
807+
this.timeoutContext ??= new CursorTimeoutContext(
808+
TimeoutContext.create({
809+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
810+
timeoutMS: this.cursorOptions.timeoutMS
811+
}),
812+
this
813+
);
797814
}
798815
try {
799816
const state = await this._initialize(this.cursorSession);
@@ -872,6 +889,20 @@ export abstract class AbstractCursor<
872889
private async cleanup(timeoutMS?: number, error?: Error) {
873890
this.isClosed = true;
874891
const session = this.cursorSession;
892+
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
893+
if (timeoutMS != null) {
894+
this.timeoutContext?.clear();
895+
return new CursorTimeoutContext(
896+
TimeoutContext.create({
897+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
898+
timeoutMS
899+
}),
900+
this
901+
);
902+
} else {
903+
return this.timeoutContext?.refreshed();
904+
}
905+
};
875906
try {
876907
if (
877908
!this.isKilled &&
@@ -884,23 +915,13 @@ export abstract class AbstractCursor<
884915
this.isKilled = true;
885916
const cursorId = this.cursorId;
886917
this.cursorId = Long.ZERO;
887-
let timeoutContext: TimeoutContext | undefined;
888-
if (timeoutMS != null) {
889-
this.timeoutContext?.clear();
890-
timeoutContext = TimeoutContext.create({
891-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
892-
timeoutMS
893-
});
894-
} else {
895-
this.timeoutContext?.refresh();
896-
timeoutContext = this.timeoutContext;
897-
}
918+
898919
await executeOperation(
899920
this.cursorClient,
900921
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
901922
session
902923
}),
903-
timeoutContext
924+
timeoutContextForKillCursors()
904925
);
905926
}
906927
} catch (error) {
@@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
10421063
}
10431064

10441065
configureResourceManagement(AbstractCursor.prototype);
1066+
1067+
/**
1068+
* @internal
1069+
* The cursor timeout context is a wrapper around a timeout context
1070+
* that keeps track of the "owner" of the cursor. For timeout contexts
1071+
* instantiated inside a cursor, the owner will be the cursor.
1072+
*
1073+
* All timeout behavior is exactly the same as the wrapped timeout context's.
1074+
*/
1075+
export class CursorTimeoutContext extends TimeoutContext {
1076+
constructor(
1077+
public timeoutContext: TimeoutContext,
1078+
public owner: symbol | AbstractCursor
1079+
) {
1080+
super();
1081+
}
1082+
override get serverSelectionTimeout(): Timeout | null {
1083+
return this.timeoutContext.serverSelectionTimeout;
1084+
}
1085+
override get connectionCheckoutTimeout(): Timeout | null {
1086+
return this.timeoutContext.connectionCheckoutTimeout;
1087+
}
1088+
override get clearServerSelectionTimeout(): boolean {
1089+
return this.timeoutContext.clearServerSelectionTimeout;
1090+
}
1091+
override get clearConnectionCheckoutTimeout(): boolean {
1092+
return this.timeoutContext.clearConnectionCheckoutTimeout;
1093+
}
1094+
override get timeoutForSocketWrite(): Timeout | null {
1095+
return this.timeoutContext.timeoutForSocketWrite;
1096+
}
1097+
override get timeoutForSocketRead(): Timeout | null {
1098+
return this.timeoutContext.timeoutForSocketRead;
1099+
}
1100+
override csotEnabled(): this is CSOTTimeoutContext {
1101+
return this.timeoutContext.csotEnabled();
1102+
}
1103+
override refresh(): void {
1104+
return this.timeoutContext.refresh();
1105+
}
1106+
override clear(): void {
1107+
return this.timeoutContext.clear();
1108+
}
1109+
override get maxTimeMS(): number | null {
1110+
return this.timeoutContext.maxTimeMS;
1111+
}
1112+
1113+
override refreshed(): CursorTimeoutContext {
1114+
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
1115+
}
1116+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ export type {
358358
CursorStreamOptions
359359
} from './cursor/abstract_cursor';
360360
export type {
361+
CursorTimeoutContext,
361362
InitialCursorResponse,
362363
InternalAbstractCursorOptions
363364
} from './cursor/abstract_cursor';

src/operations/find.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Document } from '../bson';
22
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
3+
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
44
import { MongoInvalidArgumentError } from '../error';
55
import { ReadConcern } from '../read_concern';
66
import type { Server } from '../sdam/server';
@@ -17,7 +17,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
1717
*/
1818
// eslint-disable-next-line @typescript-eslint/no-unused-vars
1919
export interface FindOptions<TSchema extends Document = Document>
20-
extends Omit<CommandOperationOptions, 'writeConcern'> {
20+
extends Omit<CommandOperationOptions, 'writeConcern'>,
21+
AbstractCursorOptions {
2122
/** Sets the limit of documents returned in the query. */
2223
limit?: number;
2324
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */

src/timeout.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ export abstract class TimeoutContext {
178178
else throw new MongoRuntimeError('Unrecognized options');
179179
}
180180

181+
abstract get maxTimeMS(): number | null;
182+
181183
abstract get serverSelectionTimeout(): Timeout | null;
182184

183185
abstract get connectionCheckoutTimeout(): Timeout | null;
@@ -195,6 +197,9 @@ export abstract class TimeoutContext {
195197
abstract refresh(): void;
196198

197199
abstract clear(): void;
200+
201+
/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
202+
abstract refreshed(): TimeoutContext;
198203
}
199204

200205
/** @internal */
@@ -305,6 +310,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
305310
this._serverSelectionTimeout?.clear();
306311
this._connectionCheckoutTimeout?.clear();
307312
}
313+
314+
override refreshed(): CSOTTimeoutContext {
315+
return new CSOTTimeoutContext(this);
316+
}
308317
}
309318

310319
/** @internal */
@@ -351,4 +360,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
351360
clear(): void {
352361
return;
353362
}
363+
364+
get maxTimeMS() {
365+
return null;
366+
}
367+
368+
override refreshed(): LegacyTimeoutContext {
369+
return new LegacyTimeoutContext(this.options);
370+
}
354371
}

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/* Anything javascript specific relating to timeouts */
2+
import { on } from 'node:events';
23
import { setTimeout } from 'node:timers/promises';
34

45
import { expect } from 'chai';
@@ -25,6 +26,23 @@ import { type FailPoint } from '../../tools/utils';
2526

2627
const metadata = { requires: { mongodb: '>=4.4' } };
2728

29+
async function waitForConnections(client: MongoClient, count: number): Promise<void> {
30+
let connectionCount = 0;
31+
32+
const promise = (async () => {
33+
for await (const _event of on(client, 'connectionCreated')) {
34+
connectionCount++;
35+
if (connectionCount === count) {
36+
break;
37+
}
38+
}
39+
})();
40+
41+
await client.connect();
42+
43+
return promise;
44+
}
45+
2846
describe('CSOT driver tests', metadata, () => {
2947
describe('timeoutMS inheritance', () => {
3048
let client: MongoClient;
@@ -357,7 +375,7 @@ describe('CSOT driver tests', metadata, () => {
357375
};
358376

359377
beforeEach(async function () {
360-
internalClient = this.configuration.newClient();
378+
internalClient = this.configuration.newClient({});
361379
await internalClient
362380
.db('db')
363381
.dropCollection('coll')
@@ -373,7 +391,11 @@ describe('CSOT driver tests', metadata, () => {
373391

374392
await internalClient.db().admin().command(failpoint);
375393

376-
client = this.configuration.newClient(undefined, { monitorCommands: true });
394+
client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 });
395+
396+
// wait for a handful of connections to have been established
397+
await waitForConnections(client, 5);
398+
377399
commandStarted = [];
378400
commandSucceeded = [];
379401
client.on('commandStarted', ev => commandStarted.push(ev));
@@ -487,7 +509,13 @@ describe('CSOT driver tests', metadata, () => {
487509

488510
await internalClient.db().admin().command(failpoint);
489511

490-
client = this.configuration.newClient(undefined, { monitorCommands: true });
512+
client = this.configuration.newClient(undefined, {
513+
monitorCommands: true,
514+
minPoolSize: 10
515+
});
516+
// wait for a handful of connections to have been established
517+
await waitForConnections(client, 5);
518+
491519
commandStarted = [];
492520
commandSucceeded = [];
493521
client.on('commandStarted', ev => commandStarted.push(ev));

0 commit comments

Comments
 (0)