Skip to content

Commit e5531f2

Browse files
baileympearsonnbbeeken
authored andcommitted
refactor(NODE-6411): AbstractCursor accepts an external timeout context (#4264)
1 parent c71a450 commit e5531f2

File tree

10 files changed

+330
-75
lines changed

10 files changed

+330
-75
lines changed

src/cmap/connection.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -427,9 +427,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
427427
...options
428428
};
429429

430-
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
431-
const { maxTimeMS } = options.timeoutContext;
432-
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
430+
if (!options.omitMaxTimeMS) {
431+
const maxTimeMS = options.timeoutContext?.maxTimeMS;
432+
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
433433
}
434434

435435
const message = this.supportsOpMsg

src/cursor/abstract_cursor.ts

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { TimeoutContext } from '../timeout';
24+
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
2525
import { type MongoDBNamespace, squashError } from '../utils';
2626

2727
/**
@@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
119119
timeoutMS?: number;
120120
/** @internal TODO(NODE-5688): make this public */
121121
timeoutMode?: CursorTimeoutMode;
122+
123+
/**
124+
* @internal
125+
*
126+
* A timeout context to govern the total time the cursor can live. If provided, the cursor
127+
* cannot be used in ITERATION mode.
128+
*/
129+
timeoutContext?: CursorTimeoutContext;
122130
}
123131

124132
/** @internal */
@@ -171,7 +179,7 @@ export abstract class AbstractCursor<
171179
/** @internal */
172180
protected readonly cursorOptions: InternalAbstractCursorOptions;
173181
/** @internal */
174-
protected timeoutContext?: TimeoutContext;
182+
protected timeoutContext?: CursorTimeoutContext;
175183

176184
/** @event */
177185
static readonly CLOSE = 'close' as const;
@@ -205,20 +213,12 @@ export abstract class AbstractCursor<
205213
};
206214
this.cursorOptions.timeoutMS = options.timeoutMS;
207215
if (this.cursorOptions.timeoutMS != null) {
208-
if (options.timeoutMode == null) {
209-
if (options.tailable) {
210-
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
211-
} else {
212-
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
213-
}
214-
} else {
215-
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
216-
throw new MongoInvalidArgumentError(
217-
"Cannot set tailable cursor's timeoutMode to LIFETIME"
218-
);
219-
}
220-
this.cursorOptions.timeoutMode = options.timeoutMode;
216+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
217+
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
221218
}
219+
this.cursorOptions.timeoutMode =
220+
options.timeoutMode ??
221+
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
222222
} else {
223223
if (options.timeoutMode != null)
224224
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
@@ -264,6 +264,17 @@ export abstract class AbstractCursor<
264264
utf8: options?.enableUtf8Validation === false ? false : true
265265
}
266266
};
267+
268+
if (
269+
options.timeoutContext != null &&
270+
options.timeoutMS != null &&
271+
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
272+
) {
273+
throw new MongoAPIError(
274+
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
275+
);
276+
}
277+
this.timeoutContext = options.timeoutContext;
267278
}
268279

269280
/**
@@ -721,6 +732,9 @@ export abstract class AbstractCursor<
721732
* if the resultant data has already been retrieved by this cursor.
722733
*/
723734
rewind(): void {
735+
if (this.timeoutContext && this.timeoutContext.owner !== this) {
736+
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
737+
}
724738
if (!this.initialized) {
725739
return;
726740
}
@@ -790,10 +804,13 @@ export abstract class AbstractCursor<
790804
*/
791805
private async cursorInit(): Promise<void> {
792806
if (this.cursorOptions.timeoutMS != null) {
793-
this.timeoutContext = TimeoutContext.create({
794-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
795-
timeoutMS: this.cursorOptions.timeoutMS
796-
});
807+
this.timeoutContext ??= new CursorTimeoutContext(
808+
TimeoutContext.create({
809+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
810+
timeoutMS: this.cursorOptions.timeoutMS
811+
}),
812+
this
813+
);
797814
}
798815
try {
799816
const state = await this._initialize(this.cursorSession);
@@ -872,6 +889,20 @@ export abstract class AbstractCursor<
872889
private async cleanup(timeoutMS?: number, error?: Error) {
873890
this.isClosed = true;
874891
const session = this.cursorSession;
892+
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
893+
if (timeoutMS != null) {
894+
this.timeoutContext?.clear();
895+
return new CursorTimeoutContext(
896+
TimeoutContext.create({
897+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
898+
timeoutMS
899+
}),
900+
this
901+
);
902+
} else {
903+
return this.timeoutContext?.refreshed();
904+
}
905+
};
875906
try {
876907
if (
877908
!this.isKilled &&
@@ -884,23 +915,13 @@ export abstract class AbstractCursor<
884915
this.isKilled = true;
885916
const cursorId = this.cursorId;
886917
this.cursorId = Long.ZERO;
887-
let timeoutContext: TimeoutContext | undefined;
888-
if (timeoutMS != null) {
889-
this.timeoutContext?.clear();
890-
timeoutContext = TimeoutContext.create({
891-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
892-
timeoutMS
893-
});
894-
} else {
895-
this.timeoutContext?.refresh();
896-
timeoutContext = this.timeoutContext;
897-
}
918+
898919
await executeOperation(
899920
this.cursorClient,
900921
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
901922
session
902923
}),
903-
timeoutContext
924+
timeoutContextForKillCursors()
904925
);
905926
}
906927
} catch (error) {
@@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
10421063
}
10431064

10441065
configureResourceManagement(AbstractCursor.prototype);
1066+
1067+
/**
1068+
* @internal
1069+
* The cursor timeout context is a wrapper around a timeout context
1070+
* that keeps track of the "owner" of the cursor. For timeout contexts
1071+
* instantiated inside a cursor, the owner will be the cursor.
1072+
*
1073+
* All timeout behavior is exactly the same as the wrapped timeout context's.
1074+
*/
1075+
export class CursorTimeoutContext extends TimeoutContext {
1076+
constructor(
1077+
public timeoutContext: TimeoutContext,
1078+
public owner: symbol | AbstractCursor
1079+
) {
1080+
super();
1081+
}
1082+
override get serverSelectionTimeout(): Timeout | null {
1083+
return this.timeoutContext.serverSelectionTimeout;
1084+
}
1085+
override get connectionCheckoutTimeout(): Timeout | null {
1086+
return this.timeoutContext.connectionCheckoutTimeout;
1087+
}
1088+
override get clearServerSelectionTimeout(): boolean {
1089+
return this.timeoutContext.clearServerSelectionTimeout;
1090+
}
1091+
override get clearConnectionCheckoutTimeout(): boolean {
1092+
return this.timeoutContext.clearConnectionCheckoutTimeout;
1093+
}
1094+
override get timeoutForSocketWrite(): Timeout | null {
1095+
return this.timeoutContext.timeoutForSocketWrite;
1096+
}
1097+
override get timeoutForSocketRead(): Timeout | null {
1098+
return this.timeoutContext.timeoutForSocketRead;
1099+
}
1100+
override csotEnabled(): this is CSOTTimeoutContext {
1101+
return this.timeoutContext.csotEnabled();
1102+
}
1103+
override refresh(): void {
1104+
return this.timeoutContext.refresh();
1105+
}
1106+
override clear(): void {
1107+
return this.timeoutContext.clear();
1108+
}
1109+
override get maxTimeMS(): number | null {
1110+
return this.timeoutContext.maxTimeMS;
1111+
}
1112+
1113+
override refreshed(): CursorTimeoutContext {
1114+
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
1115+
}
1116+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ export type {
359359
CursorStreamOptions
360360
} from './cursor/abstract_cursor';
361361
export type {
362+
CursorTimeoutContext,
362363
InitialCursorResponse,
363364
InternalAbstractCursorOptions
364365
} from './cursor/abstract_cursor';

src/operations/find.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Document } from '../bson';
22
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
3+
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
44
import { MongoInvalidArgumentError } from '../error';
55
import { type ExplainOptions } from '../explain';
66
import { ReadConcern } from '../read_concern';
@@ -18,7 +18,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
1818
*/
1919
// eslint-disable-next-line @typescript-eslint/no-unused-vars
2020
export interface FindOptions<TSchema extends Document = Document>
21-
extends Omit<CommandOperationOptions, 'writeConcern' | 'explain'> {
21+
extends Omit<CommandOperationOptions, 'writeConcern' | 'explain'>,
22+
AbstractCursorOptions {
2223
/** Sets the limit of documents returned in the query. */
2324
limit?: number;
2425
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */

src/timeout.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ export abstract class TimeoutContext {
178178
else throw new MongoRuntimeError('Unrecognized options');
179179
}
180180

181+
abstract get maxTimeMS(): number | null;
182+
181183
abstract get serverSelectionTimeout(): Timeout | null;
182184

183185
abstract get connectionCheckoutTimeout(): Timeout | null;
@@ -195,6 +197,9 @@ export abstract class TimeoutContext {
195197
abstract refresh(): void;
196198

197199
abstract clear(): void;
200+
201+
/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
202+
abstract refreshed(): TimeoutContext;
198203
}
199204

200205
/** @internal */
@@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
317322
throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`);
318323
return remainingTimeMS;
319324
}
325+
326+
override refreshed(): CSOTTimeoutContext {
327+
return new CSOTTimeoutContext(this);
328+
}
320329
}
321330

322331
/** @internal */
@@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
363372
clear(): void {
364373
return;
365374
}
375+
376+
get maxTimeMS() {
377+
return null;
378+
}
379+
380+
override refreshed(): LegacyTimeoutContext {
381+
return new LegacyTimeoutContext(this.options);
382+
}
366383
}

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import {
2626
MongoServerError,
2727
ObjectId
2828
} from '../../mongodb';
29-
import { type FailPoint } from '../../tools/utils';
29+
import { type FailPoint, waitUntilPoolsFilled } from '../../tools/utils';
3030

3131
const metadata = { requires: { mongodb: '>=4.4' } };
3232

@@ -362,7 +362,7 @@ describe('CSOT driver tests', metadata, () => {
362362
};
363363

364364
beforeEach(async function () {
365-
internalClient = this.configuration.newClient();
365+
internalClient = this.configuration.newClient({});
366366
await internalClient
367367
.db('db')
368368
.dropCollection('coll')
@@ -378,7 +378,11 @@ describe('CSOT driver tests', metadata, () => {
378378

379379
await internalClient.db().admin().command(failpoint);
380380

381-
client = this.configuration.newClient(undefined, { monitorCommands: true });
381+
client = this.configuration.newClient(undefined, { monitorCommands: true, minPoolSize: 10 });
382+
383+
// wait for a handful of connections to have been established
384+
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);
385+
382386
commandStarted = [];
383387
commandSucceeded = [];
384388
client.on('commandStarted', ev => commandStarted.push(ev));
@@ -492,7 +496,13 @@ describe('CSOT driver tests', metadata, () => {
492496

493497
await internalClient.db().admin().command(failpoint);
494498

495-
client = this.configuration.newClient(undefined, { monitorCommands: true });
499+
client = this.configuration.newClient(undefined, {
500+
monitorCommands: true,
501+
minPoolSize: 10
502+
});
503+
// wait for a handful of connections to have been established
504+
await waitUntilPoolsFilled(client, AbortSignal.timeout(30_000), 5);
505+
496506
commandStarted = [];
497507
commandSucceeded = [];
498508
client.on('commandStarted', ev => commandStarted.push(ev));

0 commit comments

Comments
 (0)