Skip to content

Commit 6ee5c9c

Browse files
POC
1 parent 7a12914 commit 6ee5c9c

File tree

5 files changed

+80
-10
lines changed

5 files changed

+80
-10
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2121
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
2222
import type { Server } from '../sdam/server';
2323
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { TimeoutContext } from '../timeout';
24+
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
2525
import { type MongoDBNamespace, squashError } from '../utils';
2626

2727
/**
@@ -119,6 +119,9 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
119119
timeoutMS?: number;
120120
/** @internal TODO(NODE-5688): make this public */
121121
timeoutMode?: CursorTimeoutMode;
122+
123+
/** @internal */
124+
timeoutContext?: CursorTimeoutContext;
122125
}
123126

124127
/** @internal */
@@ -140,6 +143,46 @@ export type AbstractCursorEvents = {
140143
[AbstractCursor.CLOSE](): void;
141144
};
142145

146+
export class CursorTimeoutContext extends TimeoutContext {
147+
constructor(
148+
public timeoutContext: TimeoutContext,
149+
public owner: AbstractCursor | null = null
150+
) {
151+
super();
152+
}
153+
154+
override get serverSelectionTimeout(): Timeout | null {
155+
return this.timeoutContext.serverSelectionTimeout;
156+
}
157+
override get connectionCheckoutTimeout(): Timeout | null {
158+
return this.timeoutContext.connectionCheckoutTimeout;
159+
}
160+
override get clearServerSelectionTimeout(): boolean {
161+
return this.timeoutContext.clearServerSelectionTimeout;
162+
}
163+
override get clearConnectionCheckoutTimeout(): boolean {
164+
return this.timeoutContext.clearConnectionCheckoutTimeout;
165+
}
166+
override get timeoutForSocketWrite(): Timeout | null {
167+
return this.timeoutContext.timeoutForSocketWrite;
168+
}
169+
override get timeoutForSocketRead(): Timeout | null {
170+
return this.timeoutContext.timeoutForSocketRead;
171+
}
172+
override csotEnabled(): this is CSOTTimeoutContext {
173+
return this.timeoutContext.csotEnabled();
174+
}
175+
override refresh(): void {
176+
return this.timeoutContext.refresh();
177+
}
178+
override clear(): void {
179+
return this.timeoutContext.clear();
180+
}
181+
override refreshed(): TimeoutContext {
182+
return new CursorTimeoutContext(this.timeoutContext.refreshed(), this.owner);
183+
}
184+
}
185+
143186
/** @public */
144187
export abstract class AbstractCursor<
145188
TSchema = any,
@@ -171,7 +214,7 @@ export abstract class AbstractCursor<
171214
/** @internal */
172215
protected readonly cursorOptions: InternalAbstractCursorOptions;
173216
/** @internal */
174-
protected timeoutContext?: TimeoutContext;
217+
protected timeoutContext?: CursorTimeoutContext;
175218

176219
/** @event */
177220
static readonly CLOSE = 'close' as const;
@@ -264,6 +307,8 @@ export abstract class AbstractCursor<
264307
utf8: options?.enableUtf8Validation === false ? false : true
265308
}
266309
};
310+
311+
this.timeoutContext = options.timeoutContext;
267312
}
268313

269314
/**
@@ -790,10 +835,13 @@ export abstract class AbstractCursor<
790835
*/
791836
private async cursorInit(): Promise<void> {
792837
if (this.cursorOptions.timeoutMS != null) {
793-
this.timeoutContext = TimeoutContext.create({
794-
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
795-
timeoutMS: this.cursorOptions.timeoutMS
796-
});
838+
this.timeoutContext = new CursorTimeoutContext(
839+
TimeoutContext.create({
840+
serverSelectionTimeoutMS: this.client.options.serverSelectionTimeoutMS,
841+
timeoutMS: this.cursorOptions.timeoutMS
842+
}),
843+
this
844+
);
797845
}
798846
try {
799847
const state = await this._initialize(this.cursorSession);

src/cursor/client_bulk_write_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
3636
constructor(
3737
client: MongoClient,
3838
commandBuilder: ClientBulkWriteCommandBuilder,
39-
options: ClientBulkWriteOptions = {}
39+
options: ClientBulkWriteOptions & AbstractCursorOptions = {}
4040
) {
4141
super(client, new MongoDBNamespace('admin', '$cmd'), options);
4242

src/mongo_client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
493493
models: AnyClientBulkWriteModel[],
494494
options?: ClientBulkWriteOptions
495495
): Promise<ClientBulkWriteResult | { ok: 1 }> {
496-
return await new ClientBulkWriteExecutor(this, models, options).execute();
496+
return await new ClientBulkWriteExecutor(this, models, resolveOptions(this, options)).execute();
497497
}
498498

499499
/**

src/operations/client_bulk_write/executor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import { CursorTimeoutContext } from '../../cursor/abstract_cursor';
12
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
23
import { type MongoClient } from '../../mongo_client';
4+
import { TimeoutContext } from '../../timeout';
35
import { WriteConcern } from '../../write_concern';
46
import { executeOperation } from '../execute_operation';
57
import { ClientBulkWriteOperation } from './client_bulk_write';
@@ -55,19 +57,29 @@ export class ClientBulkWriteExecutor {
5557
this.options,
5658
pkFactory
5759
);
60+
61+
const timeoutContext = TimeoutContext.create({
62+
...this.options,
63+
serverSelectionTimeoutMS: this.client.s.options.serverSelectionTimeoutMS,
64+
waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS,
65+
socketTimeoutMS: this.client.s.options.socketTimeoutMS
66+
});
5867
// Unacknowledged writes need to execute all batches and return { ok: 1}
5968
if (this.options.writeConcern?.w === 0) {
6069
while (commandBuilder.hasNextBatch()) {
6170
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
62-
await executeOperation(this.client, operation);
71+
await executeOperation(this.client, operation, timeoutContext);
6372
}
6473
return { ok: 1 };
6574
} else {
6675
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
6776
// For each command will will create and exhaust a cursor for the results.
6877
let currentBatchOffset = 0;
6978
while (commandBuilder.hasNextBatch()) {
70-
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
79+
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, {
80+
...this.options,
81+
timeoutContext: new CursorTimeoutContext(timeoutContext)
82+
});
7183
const docs = await cursor.toArray();
7284
const operations = cursor.operations;
7385
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);

src/timeout.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ export abstract class TimeoutContext {
195195
abstract refresh(): void;
196196

197197
abstract clear(): void;
198+
199+
abstract refreshed(): TimeoutContext;
198200
}
199201

200202
/** @internal */
@@ -305,6 +307,10 @@ export class CSOTTimeoutContext extends TimeoutContext {
305307
this._serverSelectionTimeout?.clear();
306308
this._connectionCheckoutTimeout?.clear();
307309
}
310+
311+
override refreshed(): CSOTTimeoutContext {
312+
return new CSOTTimeoutContext(this);
313+
}
308314
}
309315

310316
/** @internal */
@@ -351,4 +357,8 @@ export class LegacyTimeoutContext extends TimeoutContext {
351357
clear(): void {
352358
return;
353359
}
360+
361+
override refreshed(): LegacyTimeoutContext {
362+
return new LegacyTimeoutContext(this.options);
363+
}
354364
}

0 commit comments

Comments
 (0)