Skip to content

Commit 6a658ec

Browse files
Add csot support for bulk write
1 parent 392599c commit 6a658ec

File tree

9 files changed

+445
-11
lines changed

9 files changed

+445
-11
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ export abstract class AbstractCursor<
220220
options.timeoutMode ??
221221
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
222222
} else {
223-
if (options.timeoutMode != null)
223+
if (options.timeoutMode != null && options.timeoutContext == null)
224224
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
225225
}
226226
this.cursorOptions.omitMaxTimeMS =

src/cursor/client_bulk_write_cursor.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
3636
constructor(
3737
client: MongoClient,
3838
commandBuilder: ClientBulkWriteCommandBuilder,
39-
options: ClientBulkWriteOptions = {}
39+
options: ClientBulkWriteCursorOptions = {}
4040
) {
4141
super(client, new MongoDBNamespace('admin', '$cmd'), options);
4242

@@ -78,7 +78,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
7878
session
7979
});
8080

81-
const response = await executeOperation(this.client, clientBulkWriteOperation);
81+
const response = await executeOperation(
82+
this.client,
83+
clientBulkWriteOperation,
84+
this.timeoutContext
85+
);
8286
this.cursorResponse = response;
8387

8488
return { server: clientBulkWriteOperation.server, session, response };

src/operations/client_bulk_write/executor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
12
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
23
import { type MongoClient } from '../../mongo_client';
4+
import { TimeoutContext } from '../../timeout';
5+
import { resolveTimeoutOptions } from '../../utils';
36
import { WriteConcern } from '../../write_concern';
47
import { executeOperation } from '../execute_operation';
58
import { ClientBulkWriteOperation } from './client_bulk_write';
@@ -56,18 +59,27 @@ export class ClientBulkWriteExecutor {
5659
pkFactory
5760
);
5861
// Unacknowledged writes need to execute all batches and return { ok: 1}
62+
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
63+
const context = TimeoutContext.create(resolvedOptions);
64+
5965
if (this.options.writeConcern?.w === 0) {
6066
while (commandBuilder.hasNextBatch()) {
6167
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
62-
await executeOperation(this.client, operation);
68+
await executeOperation(this.client, operation, context);
6369
}
6470
return { ok: 1 };
6571
} else {
6672
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
6773
// For each command will will create and exhaust a cursor for the results.
6874
let currentBatchOffset = 0;
6975
while (commandBuilder.hasNextBatch()) {
70-
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
76+
const cursorContext = new CursorTimeoutContext(context, Symbol());
77+
const options = {
78+
...this.options,
79+
timeoutContext: cursorContext,
80+
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
81+
};
82+
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
7183
const docs = await cursor.toArray();
7284
const operations = cursor.operations;
7385
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);

src/sdam/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export type ServerEvents = {
106106
EventEmitterWithState;
107107

108108
/** @internal */
109-
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
109+
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
110110
timeoutContext: TimeoutContext;
111111
};
112112

src/timeout.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ function isCSOTTimeoutContextOptions(v: unknown): v is CSOTTimeoutContextOptions
171171

172172
/** @internal */
173173
export abstract class TimeoutContext {
174-
static create(options: TimeoutContextOptions): TimeoutContext {
174+
static create(options: Partial<TimeoutContextOptions>): TimeoutContext {
175175
if (options.session?.timeoutContext != null) return options.session?.timeoutContext;
176176
if (isCSOTTimeoutContextOptions(options)) return new CSOTTimeoutContext(options);
177177
else if (isLegacyTimeoutContextOptions(options)) return new LegacyTimeoutContext(options);

src/utils.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { ServerType } from './sdam/common';
3535
import type { Server } from './sdam/server';
3636
import type { Topology } from './sdam/topology';
3737
import type { ClientSession } from './sessions';
38+
import { type TimeoutContextOptions } from './timeout';
3839
import { WriteConcern } from './write_concern';
3940

4041
/**
@@ -514,6 +515,21 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
514515
return keys.length > 0 && keys[0][0] === '$';
515516
}
516517

518+
export function resolveTimeoutOptions<T extends Partial<TimeoutContextOptions>>(
519+
client: MongoClient,
520+
options?: T
521+
): Pick<
522+
MongoClient['s']['options'],
523+
'serverSelectionTimeoutMS' | 'socketTimeoutMS' | 'waitQueueTimeoutMS' | 'timeoutMS'
524+
> &
525+
T {
526+
const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } =
527+
client.s.options;
528+
return Object.assign(
529+
{ socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS },
530+
options
531+
);
532+
}
517533
/**
518534
* Merge inherited properties from parent into options, prioritizing values from options,
519535
* then values from parent.

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ describe('CSOT spec prose tests', function () {
11041104
});
11051105
});
11061106

1107-
describe.skip(
1107+
describe(
11081108
'11. Multi-batch bulkWrites',
11091109
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
11101110
function () {
@@ -1184,7 +1184,7 @@ describe('CSOT spec prose tests', function () {
11841184
client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
11851185
});
11861186

1187-
it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
1187+
it('performs two bulkWrites which fail to complete before 2000 ms', async function () {
11881188
const writes = [];
11891189
client.on('commandStarted', ev => writes.push(ev));
11901190

@@ -1199,7 +1199,7 @@ describe('CSOT spec prose tests', function () {
11991199

12001200
expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
12011201
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
1202-
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
1202+
});
12031203
}
12041204
);
12051205
});

0 commit comments

Comments
 (0)