Skip to content

Commit 18a1b65

Browse files
committed
start csot cursor implementation
1 parent 0b8a33f commit 18a1b65

13 files changed

+151
-33
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ export interface CommandOptions extends BSONSerializeOptions {
8585
documentsReturnedIn?: string;
8686
noResponse?: boolean;
8787
omitReadPreference?: boolean;
88+
omitMaxTimeMS?: boolean;
8889

8990
// TODO(NODE-2802): Currently the CommandOptions take a property willRetryWrite which is a hint
9091
// from executeOperation that the txnNum should be applied to this command.
@@ -418,7 +419,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
418419
...options
419420
};
420421

421-
if (options.timeoutContext?.csotEnabled()) {
422+
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
422423
const { maxTimeMS } = options.timeoutContext;
423424
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
424425
}

src/cursor/abstract_cursor.ts

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2020
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2121
import type { Server } from '../sdam/server';
2222
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
23+
import { TimeoutContext } from '../timeout';
2324
import { type MongoDBNamespace, squashError } from '../utils';
2425

2526
/**
@@ -59,11 +60,15 @@ export interface CursorStreamOptions {
5960
/** @public */
6061
export type CursorFlag = (typeof CURSOR_FLAGS)[number];
6162

63+
/** @internal */
6264
export const CursorTimeoutMode = Object.freeze({
6365
ITERATION: 'iteration',
6466
LIFETIME: 'lifetime'
6567
} as const);
6668

69+
/** @internal
70+
* TODO(NODE-5688): Document and release
71+
* */
6772
export type CursorTimeoutMode = (typeof CursorTimeoutMode)[keyof typeof CursorTimeoutMode];
6873

6974
/** @public */
@@ -111,6 +116,7 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
111116
noCursorTimeout?: boolean;
112117
/** @internal TODO(NODE-5688): make this public */
113118
timeoutMS?: number;
119+
/** @internal TODO(NODE-5688): make this public */
114120
timeoutMode?: CursorTimeoutMode;
115121
}
116122

@@ -131,6 +137,12 @@ export type AbstractCursorEvents = {
131137
[AbstractCursor.CLOSE](): void;
132138
};
133139

140+
/** @internal */
141+
export type CursorInitializeOptions = {
142+
omitMaxTimeMS?: boolean;
143+
timeoutContext?: TimeoutContext;
144+
};
145+
134146
/** @public */
135147
export abstract class AbstractCursor<
136148
TSchema = any,
@@ -161,6 +173,8 @@ export abstract class AbstractCursor<
161173
private isKilled: boolean;
162174
/** @internal */
163175
protected readonly cursorOptions: InternalAbstractCursorOptions;
176+
/** @internal */
177+
protected timeoutContext?: TimeoutContext;
164178

165179
/** @event */
166180
static readonly CLOSE = 'close' as const;
@@ -191,11 +205,7 @@ export abstract class AbstractCursor<
191205
};
192206
this.cursorOptions.timeoutMS = options.timeoutMS;
193207
if (this.cursorOptions.timeoutMS != null) {
194-
this.cursorOptions.timeoutMode = options.timeoutMode;
195-
}
196-
197-
if (this.cursorOptions.timeoutMS != null) {
198-
if (this.cursorOptions.timeoutMode == null) {
208+
if (options.timeoutMode == null) {
199209
if (this.cursorOptions.tailable) {
200210
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
201211
} else {
@@ -442,6 +452,9 @@ export abstract class AbstractCursor<
442452
await this.fetchBatch();
443453
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
444454

455+
if (this.cursorOptions.timeoutMode === 'iteration') {
456+
this.timeoutContext?.refresh();
457+
}
445458
return null;
446459
}
447460

@@ -493,8 +506,8 @@ export abstract class AbstractCursor<
493506
/**
494507
* Frees any client-side resources used by the cursor.
495508
*/
496-
async close(): Promise<void> {
497-
await this.cleanup();
509+
async close(timeoutMS?: number): Promise<void> {
510+
await this.cleanup(timeoutMS);
498511
}
499512

500513
/**
@@ -699,7 +712,8 @@ export abstract class AbstractCursor<
699712

700713
/** @internal */
701714
protected abstract _initialize(
702-
session: ClientSession | undefined
715+
session: ClientSession | undefined,
716+
options?: CursorInitializeOptions
703717
): Promise<InitialCursorResponse>;
704718

705719
/** @internal */
@@ -721,11 +735,12 @@ export abstract class AbstractCursor<
721735
{
722736
...this.cursorOptions,
723737
session: this.cursorSession,
724-
batchSize
738+
batchSize,
739+
omitMaxTimeMS: this.cursorOptions.timeoutMode != null
725740
}
726741
);
727742

728-
return await executeOperation(this.cursorClient, getMoreOperation);
743+
return await executeOperation(this.cursorClient, getMoreOperation, this.timeoutContext);
729744
}
730745

731746
/**
@@ -736,8 +751,22 @@ export abstract class AbstractCursor<
736751
* a significant refactor.
737752
*/
738753
private async cursorInit(): Promise<void> {
754+
if (this.cursorOptions.timeoutMS != null) {
755+
this.timeoutContext = TimeoutContext.create({
756+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
757+
timeoutMS: this.cursorOptions.timeoutMS,
758+
cursorTimeoutMode: this.cursorOptions.timeoutMode
759+
});
760+
}
761+
const omitMaxTimeMS =
762+
this.cursorOptions.timeoutMS != null &&
763+
((this.cursorOptions.timeoutMode === 'iteration' && !this.cursorOptions.tailable) ||
764+
(this.cursorOptions.tailable && !this.cursorOptions.awaitData));
739765
try {
740-
const state = await this._initialize(this.cursorSession);
766+
const state = await this._initialize(this.cursorSession, {
767+
timeoutContext: this.timeoutContext,
768+
omitMaxTimeMS
769+
});
741770
const response = state.response;
742771
this.selectedServer = state.server;
743772
this.cursorId = response.id;
@@ -747,7 +776,7 @@ export abstract class AbstractCursor<
747776
} catch (error) {
748777
// the cursor is now initialized, even if an error occurred
749778
this.initialized = true;
750-
await this.cleanup(error);
779+
await this.cleanup(undefined, error);
751780
throw error;
752781
}
753782

@@ -788,7 +817,7 @@ export abstract class AbstractCursor<
788817
this.documents = response;
789818
} catch (error) {
790819
try {
791-
await this.cleanup(error);
820+
await this.cleanup(undefined, error);
792821
} catch (error) {
793822
// `cleanupCursor` should never throw, squash and throw the original error
794823
squashError(error);
@@ -809,7 +838,7 @@ export abstract class AbstractCursor<
809838
}
810839

811840
/** @internal */
812-
private async cleanup(error?: Error) {
841+
private async cleanup(timeoutMS?: number, error?: Error) {
813842
this.isClosed = true;
814843
const session = this.cursorSession;
815844
try {
@@ -824,11 +853,22 @@ export abstract class AbstractCursor<
824853
this.isKilled = true;
825854
const cursorId = this.cursorId;
826855
this.cursorId = Long.ZERO;
856+
let timeoutContext: TimeoutContext | undefined;
857+
if (timeoutMS != null) {
858+
timeoutContext = TimeoutContext.create({
859+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
860+
timeoutMS
861+
});
862+
} else {
863+
this.timeoutContext?.refresh();
864+
timeoutContext = this.timeoutContext;
865+
}
827866
await executeOperation(
828867
this.cursorClient,
829868
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
830869
session
831-
})
870+
}),
871+
timeoutContext
832872
);
833873
}
834874
} catch (error) {

src/cursor/aggregation_cursor.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import type { ClientSession } from '../sessions';
88
import type { Sort } from '../sort';
99
import type { MongoDBNamespace } from '../utils';
1010
import { mergeOptions } from '../utils';
11-
import type { AbstractCursorOptions, InitialCursorResponse } from './abstract_cursor';
11+
import type {
12+
AbstractCursorOptions,
13+
CursorInitializeOptions,
14+
InitialCursorResponse
15+
} from './abstract_cursor';
1216
import { AbstractCursor, CursorTimeoutMode } from './abstract_cursor';
1317

1418
/** @public */
@@ -61,14 +65,22 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
6165
}
6266

6367
/** @internal */
64-
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
68+
async _initialize(
69+
session: ClientSession,
70+
options?: CursorInitializeOptions
71+
): Promise<InitialCursorResponse> {
6572
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
6673
...this.aggregateOptions,
6774
...this.cursorOptions,
75+
omitMaxTimeMS: options?.omitMaxTimeMS,
6876
session
6977
});
7078

71-
const response = await executeOperation(this.client, aggregateOperation);
79+
const response = await executeOperation(
80+
this.client,
81+
aggregateOperation,
82+
options?.timeoutContext
83+
);
7284

7385
return { server: aggregateOperation.server, session, response };
7486
}

src/cursor/change_stream_cursor.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { maxWireVersion, type MongoDBNamespace } from '../utils';
1717
import {
1818
AbstractCursor,
1919
type AbstractCursorOptions,
20+
type CursorInitializeOptions,
2021
type InitialCursorResponse
2122
} from './abstract_cursor';
2223

@@ -126,14 +127,22 @@ export class ChangeStreamCursor<
126127
});
127128
}
128129

129-
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
130+
async _initialize(
131+
session: ClientSession,
132+
options?: CursorInitializeOptions
133+
): Promise<InitialCursorResponse> {
130134
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
131135
...this.cursorOptions,
132136
...this.changeStreamCursorOptions,
137+
omitMaxTimeMS: options?.omitMaxTimeMS,
133138
session
134139
});
135140

136-
const response = await executeOperation(session.client, aggregateOperation);
141+
const response = await executeOperation(
142+
session.client,
143+
aggregateOperation,
144+
options?.timeoutContext
145+
);
137146

138147
const server = aggregateOperation.server;
139148
this.maxWireVersion = maxWireVersion(server);

src/cursor/find_cursor.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@ import type { Hint } from '../operations/operation';
1111
import type { ClientSession } from '../sessions';
1212
import { formatSort, type Sort, type SortDirection } from '../sort';
1313
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
14-
import { AbstractCursor, type InitialCursorResponse } from './abstract_cursor';
14+
import {
15+
AbstractCursor,
16+
type CursorInitializeOptions,
17+
type InitialCursorResponse
18+
} from './abstract_cursor';
1519

1620
/** @public Flags allowed for cursor */
1721
export const FLAGS = [
@@ -62,14 +66,18 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
6266
}
6367

6468
/** @internal */
65-
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
69+
async _initialize(
70+
session: ClientSession,
71+
options?: CursorInitializeOptions
72+
): Promise<InitialCursorResponse> {
6673
const findOperation = new FindOperation(this.namespace, this.cursorFilter, {
6774
...this.findOptions, // NOTE: order matters here, we may need to refine this
6875
...this.cursorOptions,
76+
omitMaxTimeMS: options?.omitMaxTimeMS,
6977
session
7078
});
7179

72-
const response = await executeOperation(this.client, findOperation);
80+
const response = await executeOperation(this.client, findOperation, options?.timeoutContext);
7381

7482
// the response is not a cursor when `explain` is enabled
7583
this.numReturned = response.batchSize;

src/cursor/list_collections_cursor.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import {
77
type ListCollectionsOptions
88
} from '../operations/list_collections';
99
import type { ClientSession } from '../sessions';
10-
import { AbstractCursor, type InitialCursorResponse } from './abstract_cursor';
10+
import {
11+
AbstractCursor,
12+
type CursorInitializeOptions,
13+
type InitialCursorResponse
14+
} from './abstract_cursor';
1115

1216
/** @public */
1317
export class ListCollectionsCursor<
@@ -34,14 +38,18 @@ export class ListCollectionsCursor<
3438
}
3539

3640
/** @internal */
37-
async _initialize(session: ClientSession | undefined): Promise<InitialCursorResponse> {
41+
async _initialize(
42+
session: ClientSession | undefined,
43+
options?: CursorInitializeOptions
44+
): Promise<InitialCursorResponse> {
3845
const operation = new ListCollectionsOperation(this.parent, this.filter, {
3946
...this.cursorOptions,
4047
...this.options,
48+
omitMaxTimeMS: options?.omitMaxTimeMS,
4149
session
4250
});
4351

44-
const response = await executeOperation(this.parent.client, operation);
52+
const response = await executeOperation(this.parent.client, operation, options?.timeoutContext);
4553

4654
return { server: operation.server, session, response };
4755
}

src/cursor/list_indexes_cursor.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ import type { Collection } from '../collection';
22
import { executeOperation } from '../operations/execute_operation';
33
import { ListIndexesOperation, type ListIndexesOptions } from '../operations/indexes';
44
import type { ClientSession } from '../sessions';
5-
import { AbstractCursor, type InitialCursorResponse } from './abstract_cursor';
5+
import {
6+
AbstractCursor,
7+
type CursorInitializeOptions,
8+
type InitialCursorResponse
9+
} from './abstract_cursor';
610

711
/** @public */
812
export class ListIndexesCursor extends AbstractCursor {
@@ -23,10 +27,14 @@ export class ListIndexesCursor extends AbstractCursor {
2327
}
2428

2529
/** @internal */
26-
async _initialize(session: ClientSession | undefined): Promise<InitialCursorResponse> {
30+
async _initialize(
31+
session: ClientSession | undefined,
32+
options?: CursorInitializeOptions
33+
): Promise<InitialCursorResponse> {
2734
const operation = new ListIndexesOperation(this.parent, {
2835
...this.cursorOptions,
2936
...this.options,
37+
omitMaxTimeMS: options?.omitMaxTimeMS,
3038
session
3139
});
3240

0 commit comments

Comments
 (0)