Skip to content

Commit a4d15ab

Browse files
authored
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
1 parent 1c48970 commit a4d15ab

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+2002
-228
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ export interface CommandOptions extends BSONSerializeOptions {
8585
documentsReturnedIn?: string;
8686
noResponse?: boolean;
8787
omitReadPreference?: boolean;
88+
omitMaxTimeMS?: boolean;
8889

8990
// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
9091
// from executeOperation that the txnNum should be applied to this command.
@@ -418,7 +419,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
418419
...options
419420
};
420421

421-
if (options.timeoutContext?.csotEnabled()) {
422+
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
422423
const { maxTimeMS } = options.timeoutContext;
423424
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
424425
}
@@ -614,7 +615,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
614615
for await (const document of this.sendCommand(ns, command, options, responseType)) {
615616
if (options.timeoutContext?.csotEnabled()) {
616617
if (MongoDBResponse.is(document)) {
617-
// TODO(NODE-5684): test coverage to be added once cursors are enabling CSOT
618618
if (document.isMaxTimeExpiredError) {
619619
throw new MongoOperationTimeoutError('Server reported a timeout error', {
620620
cause: new MongoServerError(document.toObject())

src/collection.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
type ListSearchIndexesOptions
1212
} from './cursor/list_search_indexes_cursor';
1313
import type { Db } from './db';
14-
import { MongoInvalidArgumentError } from './error';
14+
import { MongoInvalidArgumentError, MongoOperationTimeoutError } from './error';
1515
import type { MongoClient, PkFactory } from './mongo_client';
1616
import type {
1717
Filter,
@@ -677,7 +677,9 @@ export class Collection<TSchema extends Document = Document> {
677677
new DropIndexOperation(this as TODO_NODE_3286, '*', resolveOptions(this, options))
678678
);
679679
return true;
680-
} catch {
680+
} catch (error) {
681+
if (error instanceof MongoOperationTimeoutError) throw error; // TODO: Check the spec for index management behaviour/file a drivers ticket for this
682+
// Seems like we should throw all errors
681683
return false;
682684
}
683685
}

src/cursor/abstract_cursor.ts

Lines changed: 118 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2020
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2121
import type { Server } from '../sdam/server';
2222
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
23+
import { TimeoutContext } from '../timeout';
2324
import { type MongoDBNamespace, squashError } from '../utils';
2425

2526
/**
@@ -59,6 +60,17 @@ export interface CursorStreamOptions {
5960
/** @public */
6061
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
6162

63+
/** @public*/
64+
export const CursorTimeoutMode = Object.freeze({
65+
ITERATION: 'iteration',
66+
LIFETIME: 'cursorLifetime'
67+
} as const);
68+
69+
/** @public
70+
* TODO(NODE-5688): Document and release
71+
* */
72+
export type CursorTimeoutMode = (typeof CursorTimeoutMode)[keyof typeof CursorTimeoutMode];
73+
6274
/** @public */
6375
export interface AbstractCursorOptions extends BSONSerializeOptions {
6476
session?: ClientSession;
@@ -104,6 +116,8 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
104116
noCursorTimeout?: boolean;
105117
/** @internal TODO(NODE-5688): make this public */
106118
timeoutMS?: number;
119+
/** @internal TODO(NODE-5688): make this public */
120+
timeoutMode?: CursorTimeoutMode;
107121
}
108122

109123
/** @internal */
@@ -116,6 +130,8 @@ export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPre
116130
oplogReplay?: boolean;
117131
exhaust?: boolean;
118132
partial?: boolean;
133+
134+
omitMaxTimeMS?: boolean;
119135
};
120136

121137
/** @public */
@@ -153,6 +169,8 @@ export abstract class AbstractCursor<
153169
private isKilled: boolean;
154170
/** @internal */
155171
protected readonly cursorOptions: InternalAbstractCursorOptions;
172+
/** @internal */
173+
protected timeoutContext?: TimeoutContext;
156174

157175
/** @event */
158176
static readonly CLOSE = 'close' as const;
@@ -182,6 +200,30 @@ export abstract class AbstractCursor<
182200
...pluckBSONSerializeOptions(options)
183201
};
184202
this.cursorOptions.timeoutMS = options.timeoutMS;
203+
if (this.cursorOptions.timeoutMS != null) {
204+
if (options.timeoutMode == null) {
205+
if (options.tailable) {
206+
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
207+
} else {
208+
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
209+
}
210+
} else {
211+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
212+
throw new MongoInvalidArgumentError(
213+
"Cannot set tailable cursor's timeoutMode to LIFETIME"
214+
);
215+
}
216+
this.cursorOptions.timeoutMode = options.timeoutMode;
217+
}
218+
} else {
219+
if (options.timeoutMode != null)
220+
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
221+
}
222+
this.cursorOptions.omitMaxTimeMS =
223+
this.cursorOptions.timeoutMS != null &&
224+
((this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
225+
!this.cursorOptions.tailable) ||
226+
(this.cursorOptions.tailable && !this.cursorOptions.awaitData));
185227

186228
const readConcern = ReadConcern.fromOptions(options);
187229
if (readConcern) {
@@ -389,12 +431,21 @@ export abstract class AbstractCursor<
389431
return false;
390432
}
391433

392-
do {
393-
if ((this.documents?.length ?? 0) !== 0) {
394-
return true;
434+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
435+
this.timeoutContext?.refresh();
436+
}
437+
try {
438+
do {
439+
if ((this.documents?.length ?? 0) !== 0) {
440+
return true;
441+
}
442+
await this.fetchBatch();
443+
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
444+
} finally {
445+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
446+
this.timeoutContext?.clear();
395447
}
396-
await this.fetchBatch();
397-
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
448+
}
398449

399450
return false;
400451
}
@@ -404,15 +455,24 @@ export abstract class AbstractCursor<
404455
if (this.cursorId === Long.ZERO) {
405456
throw new MongoCursorExhaustedError();
406457
}
458+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
459+
this.timeoutContext?.refresh();
460+
}
407461

408-
do {
409-
const doc = this.documents?.shift(this.cursorOptions);
410-
if (doc != null) {
411-
if (this.transform != null) return await this.transformDocument(doc);
412-
return doc;
462+
try {
463+
do {
464+
const doc = this.documents?.shift(this.cursorOptions);
465+
if (doc != null) {
466+
if (this.transform != null) return await this.transformDocument(doc);
467+
return doc;
468+
}
469+
await this.fetchBatch();
470+
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
471+
} finally {
472+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
473+
this.timeoutContext?.clear();
413474
}
414-
await this.fetchBatch();
415-
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
475+
}
416476

417477
return null;
418478
}
@@ -425,18 +485,27 @@ export abstract class AbstractCursor<
425485
throw new MongoCursorExhaustedError();
426486
}
427487

428-
let doc = this.documents?.shift(this.cursorOptions);
429-
if (doc != null) {
430-
if (this.transform != null) return await this.transformDocument(doc);
431-
return doc;
488+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
489+
this.timeoutContext?.refresh();
432490
}
491+
try {
492+
let doc = this.documents?.shift(this.cursorOptions);
493+
if (doc != null) {
494+
if (this.transform != null) return await this.transformDocument(doc);
495+
return doc;
496+
}
433497

434-
await this.fetchBatch();
498+
await this.fetchBatch();
435499

436-
doc = this.documents?.shift(this.cursorOptions);
437-
if (doc != null) {
438-
if (this.transform != null) return await this.transformDocument(doc);
439-
return doc;
500+
doc = this.documents?.shift(this.cursorOptions);
501+
if (doc != null) {
502+
if (this.transform != null) return await this.transformDocument(doc);
503+
return doc;
504+
}
505+
} finally {
506+
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
507+
this.timeoutContext?.clear();
508+
}
440509
}
441510

442511
return null;
@@ -465,8 +534,8 @@ export abstract class AbstractCursor<
465534
/**
466535
* Frees any client-side resources used by the cursor.
467536
*/
468-
async close(): Promise<void> {
469-
await this.cleanup();
537+
async close(options?: { timeoutMS?: number }): Promise<void> {
538+
await this.cleanup(options?.timeoutMS);
470539
}
471540

472541
/**
@@ -647,6 +716,8 @@ export abstract class AbstractCursor<
647716

648717
this.cursorId = null;
649718
this.documents?.clear();
719+
this.timeoutContext?.clear();
720+
this.timeoutContext = undefined;
650721
this.isClosed = false;
651722
this.isKilled = false;
652723
this.initialized = false;
@@ -697,7 +768,7 @@ export abstract class AbstractCursor<
697768
}
698769
);
699770

700-
return await executeOperation(this.cursorClient, getMoreOperation);
771+
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
701772
}
702773

703774
/**
@@ -708,6 +779,12 @@ export abstract class AbstractCursor<
708779
* a significant refactor.
709780
*/
710781
private async cursorInit(): Promise<void> {
782+
if (this.cursorOptions.timeoutMS != null) {
783+
this.timeoutContext = TimeoutContext.create({
784+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
785+
timeoutMS: this.cursorOptions.timeoutMS
786+
});
787+
}
711788
try {
712789
const state = await this._initialize(this.cursorSession);
713790
const response = state.response;
@@ -719,7 +796,7 @@ export abstract class AbstractCursor<
719796
} catch (error) {
720797
// the cursor is now initialized, even if an error occurred
721798
this.initialized = true;
722-
await this.cleanup(error);
799+
await this.cleanup(undefined, error);
723800
throw error;
724801
}
725802

@@ -753,14 +830,15 @@ export abstract class AbstractCursor<
753830

754831
// otherwise need to call getMore
755832
const batchSize = this.cursorOptions.batchSize || 1000;
833+
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
756834

757835
try {
758836
const response = await this.getMore(batchSize);
759837
this.cursorId = response.id;
760838
this.documents = response;
761839
} catch (error) {
762840
try {
763-
await this.cleanup(error);
841+
await this.cleanup(undefined, error);
764842
} catch (error) {
765843
// `cleanupCursor` should never throw, squash and throw the original error
766844
squashError(error);
@@ -781,7 +859,7 @@ export abstract class AbstractCursor<
781859
}
782860

783861
/** @internal */
784-
private async cleanup(error?: Error) {
862+
private async cleanup(timeoutMS?: number, error?: Error) {
785863
this.isClosed = true;
786864
const session = this.cursorSession;
787865
try {
@@ -796,11 +874,23 @@ export abstract class AbstractCursor<
796874
this.isKilled = true;
797875
const cursorId = this.cursorId;
798876
this.cursorId = Long.ZERO;
877+
let timeoutContext: TimeoutContext | undefined;
878+
if (timeoutMS != null) {
879+
this.timeoutContext?.clear();
880+
timeoutContext = TimeoutContext.create({
881+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
882+
timeoutMS
883+
});
884+
} else {
885+
this.timeoutContext?.refresh();
886+
timeoutContext = this.timeoutContext;
887+
}
799888
await executeOperation(
800889
this.cursorClient,
801890
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
802891
session
803-
})
892+
}),
893+
timeoutContext
804894
);
805895
}
806896
} catch (error) {

src/cursor/aggregation_cursor.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Document } from '../bson';
2+
import { MongoAPIError } from '../error';
23
import type { ExplainVerbosityLike } from '../explain';
34
import type { MongoClient } from '../mongo_client';
45
import { AggregateOperation, type AggregateOptions } from '../operations/aggregate';
@@ -8,7 +9,7 @@ import type { Sort } from '../sort';
89
import type { MongoDBNamespace } from '../utils';
910
import { mergeOptions } from '../utils';
1011
import type { AbstractCursorOptions, InitialCursorResponse } from './abstract_cursor';
11-
import { AbstractCursor } from './abstract_cursor';
12+
import { AbstractCursor, CursorTimeoutMode } from './abstract_cursor';
1213

1314
/** @public */
1415
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
@@ -36,6 +37,15 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
3637

3738
this.pipeline = pipeline;
3839
this.aggregateOptions = options;
40+
41+
const lastStage: Document | undefined = this.pipeline[this.pipeline.length - 1];
42+
43+
if (
44+
this.cursorOptions.timeoutMS != null &&
45+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
46+
(lastStage?.$merge != null || lastStage?.$out != null)
47+
)
48+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
3949
}
4050

4151
clone(): AggregationCursor<TSchema> {
@@ -58,7 +68,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
5868
session
5969
});
6070

61-
const response = await executeOperation(this.client, aggregateOperation);
71+
const response = await executeOperation(this.client, aggregateOperation, this.timeoutContext);
6272

6373
return { server: aggregateOperation.server, session, response };
6474
}
@@ -93,6 +103,13 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
93103
addStage<T = Document>(stage: Document): AggregationCursor<T>;
94104
addStage<T = Document>(stage: Document): AggregationCursor<T> {
95105
this.throwIfInitialized();
106+
if (
107+
this.cursorOptions.timeoutMS != null &&
108+
this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION &&
109+
(stage.$out != null || stage.$merge != null)
110+
) {
111+
throw new MongoAPIError('Cannot use $out or $merge stage with ITERATION timeoutMode');
112+
}
96113
this.pipeline.push(stage);
97114
return this as unknown as AggregationCursor<T>;
98115
}

src/cursor/change_stream_cursor.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,11 @@ export class ChangeStreamCursor<
133133
session
134134
});
135135

136-
const response = await executeOperation(session.client, aggregateOperation);
136+
const response = await executeOperation(
137+
session.client,
138+
aggregateOperation,
139+
this.timeoutContext
140+
);
137141

138142
const server = aggregateOperation.server;
139143
this.maxWireVersion = maxWireVersion(server);

0 commit comments

Comments
 (0)