Skip to content

Commit 7df1a70

Browse files
feat(NODE-6403): add CSOT support to client bulk write (#4261)
Co-authored-by: Warren James <[email protected]>
1 parent c0d6ec9 commit 7df1a70

File tree

13 files changed

+535
-37
lines changed

13 files changed

+535
-37
lines changed

src/cmap/connection.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
716716
throw new MongoOperationTimeoutError('Timed out at socket write');
717717
}
718718
throw error;
719+
} finally {
720+
timeout.clear();
719721
}
720722
}
721723
return await drainEvent;

src/cmap/wire_protocol/on_data.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ export function onData(
116116
emitter.off('data', eventHandler);
117117
emitter.off('error', errorHandler);
118118
finished = true;
119+
timeoutForSocketRead?.clear();
119120
const doneResult = { value: undefined, done: finished } as const;
120121

121122
for (const promise of unconsumedPromises) {

src/cursor/abstract_cursor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ export abstract class AbstractCursor<
243243
options.timeoutMode ??
244244
(options.tailable ? CursorTimeoutMode.ITERATION : CursorTimeoutMode.LIFETIME);
245245
} else {
246-
if (options.timeoutMode != null)
246+
if (options.timeoutMode != null && options.timeoutContext == null)
247247
throw new MongoInvalidArgumentError('Cannot set timeoutMode without setting timeoutMS');
248248
}
249249

src/cursor/client_bulk_write_cursor.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export class ClientBulkWriteCursor extends AbstractCursor {
3535
constructor(
3636
client: MongoClient,
3737
commandBuilder: ClientBulkWriteCommandBuilder,
38-
options: ClientBulkWriteOptions = {}
38+
options: ClientBulkWriteCursorOptions = {}
3939
) {
4040
super(client, new MongoDBNamespace('admin', '$cmd'), options);
4141

@@ -72,7 +72,11 @@ export class ClientBulkWriteCursor extends AbstractCursor {
7272
session
7373
});
7474

75-
const response = await executeOperation(this.client, clientBulkWriteOperation);
75+
const response = await executeOperation(
76+
this.client,
77+
clientBulkWriteOperation,
78+
this.timeoutContext
79+
);
7680
this.cursorResponse = response;
7781

7882
return { server: clientBulkWriteOperation.server, session, response };

src/operations/client_bulk_write/executor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
import { CursorTimeoutContext, CursorTimeoutMode } from '../../cursor/abstract_cursor';
12
import { ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
23
import {
34
MongoClientBulkWriteError,
45
MongoClientBulkWriteExecutionError,
56
MongoServerError
67
} from '../../error';
78
import { type MongoClient } from '../../mongo_client';
9+
import { TimeoutContext } from '../../timeout';
10+
import { resolveTimeoutOptions } from '../../utils';
811
import { WriteConcern } from '../../write_concern';
912
import { executeOperation } from '../execute_operation';
1013
import { ClientBulkWriteOperation } from './client_bulk_write';
@@ -70,17 +73,26 @@ export class ClientBulkWriteExecutor {
7073
pkFactory
7174
);
7275
// Unacknowledged writes need to execute all batches and return { ok: 1}
76+
const resolvedOptions = resolveTimeoutOptions(this.client, this.options);
77+
const context = TimeoutContext.create(resolvedOptions);
78+
7379
if (this.options.writeConcern?.w === 0) {
7480
while (commandBuilder.hasNextBatch()) {
7581
const operation = new ClientBulkWriteOperation(commandBuilder, this.options);
76-
await executeOperation(this.client, operation);
82+
await executeOperation(this.client, operation, context);
7783
}
7884
return { ok: 1 };
7985
} else {
8086
const resultsMerger = new ClientBulkWriteResultsMerger(this.options);
8187
// For each command will will create and exhaust a cursor for the results.
8288
while (commandBuilder.hasNextBatch()) {
83-
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, this.options);
89+
const cursorContext = new CursorTimeoutContext(context, Symbol());
90+
const options = {
91+
...this.options,
92+
timeoutContext: cursorContext,
93+
...(resolvedOptions.timeoutMS != null && { timeoutMode: CursorTimeoutMode.LIFETIME })
94+
};
95+
const cursor = new ClientBulkWriteCursor(this.client, commandBuilder, options);
8496
try {
8597
await resultsMerger.merge(cursor);
8698
} catch (error) {

src/sdam/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export type ServerEvents = {
106106
EventEmitterWithState;
107107

108108
/** @internal */
109-
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext'> & {
109+
export type ServerCommandOptions = Omit<CommandOptions, 'timeoutContext' | 'socketTimeoutMS'> & {
110110
timeoutContext: TimeoutContext;
111111
};
112112

src/utils.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { ServerType } from './sdam/common';
3535
import type { Server } from './sdam/server';
3636
import type { Topology } from './sdam/topology';
3737
import type { ClientSession } from './sessions';
38+
import { type TimeoutContextOptions } from './timeout';
3839
import { WriteConcern } from './write_concern';
3940

4041
/**
@@ -514,6 +515,18 @@ export function hasAtomicOperators(doc: Document | Document[]): boolean {
514515
return keys.length > 0 && keys[0][0] === '$';
515516
}
516517

518+
export function resolveTimeoutOptions<T extends Partial<TimeoutContextOptions>>(
519+
client: MongoClient,
520+
options: T
521+
): T &
522+
Pick<
523+
MongoClient['s']['options'],
524+
'timeoutMS' | 'serverSelectionTimeoutMS' | 'waitQueueTimeoutMS' | 'socketTimeoutMS'
525+
> {
526+
const { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS } =
527+
client.s.options;
528+
return { socketTimeoutMS, serverSelectionTimeoutMS, waitQueueTimeoutMS, timeoutMS, ...options };
529+
}
517530
/**
518531
* Merge inherited properties from parent into options, prioritizing values from options,
519532
* then values from parent.

test/integration/client-side-operations-timeout/client_side_operations_timeout.prose.test.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import {
2121
promiseWithResolvers,
2222
squashError
2323
} from '../../mongodb';
24-
import { type FailPoint } from '../../tools/utils';
24+
import { type FailPoint, makeMultiBatchWrite } from '../../tools/utils';
25+
import { filterForCommands } from '../shared';
2526

2627
// TODO(NODE-5824): Implement CSOT prose tests
2728
describe('CSOT spec prose tests', function () {
@@ -1183,9 +1184,9 @@ describe('CSOT spec prose tests', function () {
11831184
});
11841185
});
11851186

1186-
describe.skip(
1187+
describe(
11871188
'11. Multi-batch bulkWrites',
1188-
{ requires: { mongodb: '>=8.0', serverless: 'forbid' } },
1189+
{ requires: { mongodb: '>=8.0', serverless: 'forbid', topology: 'single' } },
11891190
function () {
11901191
/**
11911192
* ### 11. Multi-batch bulkWrites
@@ -1245,9 +1246,6 @@ describe('CSOT spec prose tests', function () {
12451246
}
12461247
};
12471248

1248-
let maxBsonObjectSize: number;
1249-
let maxMessageSizeBytes: number;
1250-
12511249
beforeEach(async function () {
12521250
await internalClient
12531251
.db('db')
@@ -1256,29 +1254,20 @@ describe('CSOT spec prose tests', function () {
12561254
.catch(() => null);
12571255
await internalClient.db('admin').command(failpoint);
12581256

1259-
const hello = await internalClient.db('admin').command({ hello: 1 });
1260-
maxBsonObjectSize = hello.maxBsonObjectSize;
1261-
maxMessageSizeBytes = hello.maxMessageSizeBytes;
1262-
12631257
client = this.configuration.newClient({ timeoutMS: 2000, monitorCommands: true });
12641258
});
12651259

1266-
it.skip('performs two bulkWrites which fail to complete before 2000 ms', async function () {
1260+
it('performs two bulkWrites which fail to complete before 2000 ms', async function () {
12671261
const writes = [];
1268-
client.on('commandStarted', ev => writes.push(ev));
1262+
client.on('commandStarted', filterForCommands('bulkWrite', writes));
12691263

1270-
const length = maxMessageSizeBytes / maxBsonObjectSize + 1;
1271-
const models = Array.from({ length }, () => ({
1272-
namespace: 'db.coll',
1273-
name: 'insertOne' as const,
1274-
document: { a: 'b'.repeat(maxBsonObjectSize - 500) }
1275-
}));
1264+
const models = await makeMultiBatchWrite(this.configuration);
12761265

12771266
const error = await client.bulkWrite(models).catch(error => error);
12781267

12791268
expect(error, error.stack).to.be.instanceOf(MongoOperationTimeoutError);
1280-
expect(writes.map(ev => ev.commandName)).to.deep.equal(['bulkWrite', 'bulkWrite']);
1281-
}).skipReason = 'TODO(NODE-6403): client.bulkWrite is implemented in a follow up';
1269+
expect(writes).to.have.lengthOf(2);
1270+
});
12821271
}
12831272
);
12841273
});

test/integration/client-side-operations-timeout/node_csot.test.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,16 @@ describe('CSOT driver tests', metadata, () => {
279279
.stub(Connection.prototype, 'readMany')
280280
.callsFake(async function* (...args) {
281281
const realIterator = readManyStub.wrappedMethod.call(this, ...args);
282-
const cmd = commandSpy.lastCall.args.at(1);
283-
if ('giveMeWriteErrors' in cmd) {
284-
await realIterator.next().catch(() => null); // dismiss response
285-
yield { parse: () => writeErrorsReply };
286-
} else {
287-
yield (await realIterator.next()).value;
282+
try {
283+
const cmd = commandSpy.lastCall.args.at(1);
284+
if ('giveMeWriteErrors' in cmd) {
285+
await realIterator.next().catch(() => null); // dismiss response
286+
yield { parse: () => writeErrorsReply };
287+
} else {
288+
yield (await realIterator.next()).value;
289+
}
290+
} finally {
291+
realIterator.return();
288292
}
289293
});
290294
});

test/integration/collection-management/collection_db_management.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from 'chai';
22

3-
import { Collection, type Db, type MongoClient } from '../../mongodb';
3+
import { Collection, type Db, type MongoClient, ObjectId } from '../../mongodb';
44

55
describe('Collection Management and Db Management', function () {
66
let client: MongoClient;
@@ -16,7 +16,7 @@ describe('Collection Management and Db Management', function () {
1616
});
1717

1818
it('returns a collection object after calling createCollection', async function () {
19-
const collection = await db.createCollection('collection');
19+
const collection = await db.createCollection(new ObjectId().toHexString());
2020
expect(collection).to.be.instanceOf(Collection);
2121
});
2222

0 commit comments

Comments
 (0)