Skip to content

Commit b5bf67e

Browse files
client bulk write CSOT
1 parent 1a06868 commit b5bf67e

File tree

19 files changed

+834
-113
lines changed

19 files changed

+834
-113
lines changed

src/cmap/connection.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -422,9 +422,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
422422
...options
423423
};
424424

425-
if (!options.omitMaxTimeMS && options.timeoutContext?.csotEnabled()) {
426-
const { maxTimeMS } = options.timeoutContext;
427-
if (maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
425+
if (!options.omitMaxTimeMS) {
426+
const maxTimeMS = options.timeoutContext?.maxTimeMS;
427+
if (maxTimeMS && maxTimeMS > 0 && Number.isFinite(maxTimeMS)) cmd.maxTimeMS = maxTimeMS;
428428
}
429429

430430
const message = this.supportsOpMsg
@@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
716716
throw new MongoOperationTimeoutError('Timed out at socket write');
717717
}
718718
throw error;
719+
} finally {
720+
timeout.clear();
719721
}
720722
}
721723
return await drainEvent;

src/cmap/wire_protocol/on_data.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export function onData(
116116
emitter.off('data', eventHandler);
117117
emitter.off('error', errorHandler);
118118
finished = true;
119+
timeoutForSocketRead?.clear();
119120
const doneResult = { value: undefined, done: finished } as const;
120121

121122
for (const promise of unconsumedPromises) {

src/cursor/abstract_cursor.ts

Lines changed: 104 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { TimeoutContext } from '../timeout';
24+
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
2525
import { type MongoDBNamespace, squashError } from '../utils';
2626

2727
/**
@@ -119,6 +119,14 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
119119
timeoutMS?: number;
120120
/** @internal TODO(NODE-5688): make this public */
121121
timeoutMode?: CursorTimeoutMode;
122+
123+
/**
124+
* @internal
125+
*
126+
* A timeout context to govern the total time the cursor can live. If provided, the cursor
127+
* cannot be used in ITERATION mode.
128+
*/
129+
timeoutContext?: CursorTimeoutContext;
122130
}
123131

124132
/** @internal */
@@ -171,7 +179,7 @@ export abstract class AbstractCursor<
171179
/** @internal */
172180
protected readonly cursorOptions: InternalAbstractCursorOptions;
173181
/** @internal */
174-
protected timeoutContext?: TimeoutContext;
182+
protected timeoutContext?: CursorTimeoutContext;
175183

176184
/** @event */
177185
static readonly CLOSE = 'close' as const;
@@ -205,22 +213,14 @@ export abstract class AbstractCursor<
205213
};
206214
this.cursorOptions.timeoutMS = options.timeoutMS;
207215
if (this.cursorOptions.timeoutMS != null) {
208-
if (options.timeoutMode == null) {
209-
if (options.tailable) {
210-
this.cursorOptions.timeoutMode = CursorTimeoutMode.ITERATION;
211-
} else {
212-
this.cursorOptions.timeoutMode = CursorTimeoutMode.LIFETIME;
213-
}
214-
} else {
215-
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
216-
throw new MongoInvalidArgumentError(
217-
"Cannot set tailable cursor's timeoutMode to LIFETIME"
218-
);
219-
}
220-
this.cursorOptions.timeoutMode = options.timeoutMode;
216+
if (options.tailable && this.cursorOptions.timeoutMode === CursorTimeoutMode.LIFETIME) {
217+
throw new MongoInvalidArgumentError("Cannot set tailable cursor's timeoutMode to LIFETIME");
221218
}
219+
this.cursorOptions.timeoutMode =
220+
options.timeoutMode ??
221+
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
222222
} else {
223-
if (options.timeoutMode != null)
223+
if (options.timeoutMode != null && options.timeoutContext == null)
224224
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
225225
}
226226
this.cursorOptions.omitMaxTimeMS =
@@ -264,6 +264,17 @@ export abstract class AbstractCursor<
264264
utf8: options?.enableUtf8Validation === false ? false : true
265265
}
266266
};
267+
268+
if (
269+
options.timeoutContext != null &&
270+
options.timeoutMS != null &&
271+
this.cursorOptions.timeoutMode !== CursorTimeoutMode.LIFETIME
272+
) {
273+
throw new MongoAPIError(
274+
`cannot create a cursor with an externally provided timeout context that doesn't use timeoutMode=CURSOR_LIFETIME.`
275+
);
276+
}
277+
this.timeoutContext = options.timeoutContext;
267278
}
268279

269280
/**
@@ -721,6 +732,9 @@ export abstract class AbstractCursor<
721732
* if the resultant data has already been retrieved by this cursor.
722733
*/
723734
rewind(): void {
735+
if (this.timeoutContext && this.timeoutContext.owner !== this) {
736+
throw new MongoAPIError(`Cannot rewind cursor that does not own its timeout context.`);
737+
}
724738
if (!this.initialized) {
725739
return;
726740
}
@@ -790,10 +804,13 @@ export abstract class AbstractCursor<
790804
*/
791805
private async cursorInit(): Promise<void> {
792806
if (this.cursorOptions.timeoutMS != null) {
793-
this.timeoutContext = TimeoutContext.create({
794-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
795-
timeoutMS: this.cursorOptions.timeoutMS
796-
});
807+
this.timeoutContext ??= new CursorTimeoutContext(
808+
TimeoutContext.create({
809+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
810+
timeoutMS: this.cursorOptions.timeoutMS
811+
}),
812+
this
813+
);
797814
}
798815
try {
799816
const state = await this._initialize(this.cursorSession);
@@ -872,6 +889,20 @@ export abstract class AbstractCursor<
872889
private async cleanup(timeoutMS?: number, error?: Error) {
873890
this.isClosed = true;
874891
const session = this.cursorSession;
892+
const timeoutContextForKillCursors = (): CursorTimeoutContext | undefined => {
893+
if (timeoutMS != null) {
894+
this.timeoutContext?.clear();
895+
return new CursorTimeoutContext(
896+
TimeoutContext.create({
897+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
898+
timeoutMS
899+
}),
900+
this
901+
);
902+
} else {
903+
return this.timeoutContext?.refreshed();
904+
}
905+
};
875906
try {
876907
if (
877908
!this.isKilled &&
@@ -884,23 +915,13 @@ export abstract class AbstractCursor<
884915
this.isKilled = true;
885916
const cursorId = this.cursorId;
886917
this.cursorId = Long.ZERO;
887-
let timeoutContext: TimeoutContext | undefined;
888-
if (timeoutMS != null) {
889-
this.timeoutContext?.clear();
890-
timeoutContext = TimeoutContext.create({
891-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
892-
timeoutMS
893-
});
894-
} else {
895-
this.timeoutContext?.refresh();
896-
timeoutContext = this.timeoutContext;
897-
}
918+
898919
await executeOperation(
899920
this.cursorClient,
900921
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
901922
session
902923
}),
903-
timeoutContext
924+
timeoutContextForKillCursors()
904925
);
905926
}
906927
} catch (error) {
@@ -1042,3 +1063,54 @@ class ReadableCursorStream extends Readable {
10421063
}
10431064

10441065
configureResourceManagement(AbstractCursor.prototype);
1066+
1067+
/**
1068+
* @internal
1069+
* The cursor timeout context is a wrapper around a timeout context
1070+
* that keeps track of the "owner" of the cursor. For timeout contexts
1071+
* instantiated inside a cursor, the owner will be the cursor.
1072+
*
1073+
* All timeout behavior is exactly the same as the wrapped timeout context's.
1074+
*/
1075+
export class CursorTimeoutContext extends TimeoutContext {
1076+
constructor(
1077+
public timeoutContext: TimeoutContext,
1078+
public owner: symbol | AbstractCursor
1079+
) {
1080+
super();
1081+
}
1082+
override get serverSelectionTimeout(): Timeout | null {
1083+
return this.timeoutContext.serverSelectionTimeout;
1084+
}
1085+
override get connectionCheckoutTimeout(): Timeout | null {
1086+
return this.timeoutContext.connectionCheckoutTimeout;
1087+
}
1088+
override get clearServerSelectionTimeout(): boolean {
1089+
return this.timeoutContext.clearServerSelectionTimeout;
1090+
}
1091+
override get clearConnectionCheckoutTimeout(): boolean {
1092+
return this.timeoutContext.clearConnectionCheckoutTimeout;
1093+
}
1094+
override get timeoutForSocketWrite(): Timeout | null {
1095+
return this.timeoutContext.timeoutForSocketWrite;
1096+
}
1097+
override get timeoutForSocketRead(): Timeout | null {
1098+
return this.timeoutContext.timeoutForSocketRead;
1099+
}
1100+
override csotEnabled(): this is CSOTTimeoutContext {
1101+
return this.timeoutContext.csotEnabled();
1102+
}
1103+
override refresh(): void {
1104+
return this.timeoutContext.refresh();
1105+
}
1106+
override clear(): void {
1107+
return this.timeoutContext.clear();
1108+
}
1109+
override get maxTimeMS(): number | null {
1110+
return this.timeoutContext.maxTimeMS;
1111+
}
1112+
1113+
override refreshed(): CursorTimeoutContext {
1114+
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
1115+
}
1116+
}

src/cursor/client_bulk_write_cursor.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
3636
constructor(
3737
client: MongoClient,
3838
commandBuilder: ClientBulkWriteCommandBuilder,
39-
options: ClientBulkWriteOptions = {}
39+
options: ClientBulkWriteCursorOptions = {}
4040
) {
4141
super(client, new MongoDBNamespace('admin', '$cmd'), options);
4242

@@ -78,7 +78,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
7878
session
7979
});
8080

81-
const response = await executeOperation(this.client, clientBulkWriteOperation);
81+
const response = await executeOperation(
82+
this.client,
83+
clientBulkWriteOperation,
84+
this.timeoutContext
85+
);
8286
this.cursorResponse = response;
8387

8488
return { server: clientBulkWriteOperation.server, session, response };

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ export type {
358358
CursorStreamOptions
359359
} from './cursor/abstract_cursor';
360360
export type {
361+
CursorTimeoutContext,
361362
InitialCursorResponse,
362363
InternalAbstractCursorOptions
363364
} from './cursor/abstract_cursor';

src/operations/client_bulk_write/executor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
12
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
23
import { type MongoClient } from '../../mongo_client';
4+
import { TimeoutContext } from '../../timeout';
5+
import { resolveTimeoutOptions } from '../../utils';
36
import { WriteConcern } from '../../write_concern';
47
import { executeOperation } from '../execute_operation';
58
import { ClientBulkWriteOperation } from './client_bulk_write';
@@ -56,18 +59,27 @@ export class ClientBulkWriteExecutor {
5659
pkFactory
5760
);
5861
// Unacknowledged writes need to execute all batches and return { ok: 1}
62+
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
63+
const context = TimeoutContext.create(resolvedOptions);
64+
5965
if (this.options.writeConcern?.w === 0) {
6066
while (commandBuilder.hasNextBatch()) {
6167
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
62-
await executeOperation(this.client, operation);
68+
await executeOperation(this.client, operation, context);
6369
}
6470
return { ok: 1 };
6571
} else {
6672
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
6773
// For each command will will create and exhaust a cursor for the results.
6874
let currentBatchOffset = 0;
6975
while (commandBuilder.hasNextBatch()) {
70-
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
76+
const cursorContext = new CursorTimeoutContext(context, Symbol());
77+
const options = {
78+
...this.options,
79+
timeoutContext: cursorContext,
80+
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
81+
};
82+
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
7183
const docs = await cursor.toArray();
7284
const operations = cursor.operations;
7385
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);

src/operations/find.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { Document } from '../bson';
22
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
3+
import { type AbstractCursorOptions, type CursorTimeoutMode } from '../cursor/abstract_cursor';
44
import { MongoInvalidArgumentError } from '../error';
55
import { ReadConcern } from '../read_concern';
66
import type { Server } from '../sdam/server';
@@ -17,7 +17,8 @@ import { Aspect, defineAspects, type Hint } from './operation';
1717
*/
1818
// eslint-disable-next-line @typescript-eslint/no-unused-vars
1919
export interface FindOptions<TSchema extends Document = Document>
20-
extends Omit<CommandOperationOptions, 'writeConcern'> {
20+
extends Omit<CommandOperationOptions, 'writeConcern'>,
21+
AbstractCursorOptions {
2122
/** Sets the limit of documents returned in the query. */
2223
limit?: number;
2324
/** Set to sort the documents coming back from the query. Array of indexes, `[['a', 1]]` etc. */

src/sdam/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export type ServerEvents = {
106106
EventEmitterWithState;
107107

108108
/** @internal */
109-
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
109+
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
110110
timeoutContext: TimeoutContext;
111111
};
112112

src/timeout.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,15 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions
171171

172172
/** @internal */
173173
export abstract class TimeoutContext {
174-
static create(options: TimeoutContextOptions): TimeoutContext {
174+
static create(options: Partial<TimeoutContextOptions>): TimeoutContext {
175175
if (options.session?.timeoutContext != null) return options.session?.timeoutContext;
176176
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
177177
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);
178178
else throw new MongoRuntimeError('Unrecognized options');
179179
}
180180

181+
abstract get maxTimeMS(): number | null;
182+
181183
abstract get serverSelectionTimeout(): Timeout | null;
182184

183185
abstract get connectionCheckoutTimeout(): Timeout | null;
@@ -195,6 +197,9 @@ export abstract class TimeoutContext {
195197
abstract refresh(): void;
196198

197199
abstract clear(): void;
200+
201+
/** Returns a new instance of the TimeoutContext, with all timeouts refreshed and restarted. */
202+
abstract refreshed(): TimeoutContext;
198203
}
199204

200205
/** @internal */
@@ -317,6 +322,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
317322
throw new MongoOperationTimeoutError(message ?? `Expired after ${this.timeoutMS}ms`);
318323
return remainingTimeMS;
319324
}
325+
326+
override refreshed(): CSOTTimeoutContext {
327+
return new CSOTTimeoutContext(this);
328+
}
320329
}
321330

322331
/** @internal */
@@ -363,4 +372,12 @@ export class LegacyTimeoutContext extends TimeoutContext {
363372
clear(): void {
364373
return;
365374
}
375+
376+
get maxTimeMS() {
377+
return null;
378+
}
379+
380+
override refreshed(): LegacyTimeoutContext {
381+
return new LegacyTimeoutContext(this.options);
382+
}
366383
}

0 commit comments

Comments
 (0)