Skip to content

Commit 595bf9f

Browse files
committed
refactor(NODE-6398): bulkWrite internals to use async/await
1 parent 91f3035 commit 595bf9f

File tree

3 files changed

+101
-138
lines changed

3 files changed

+101
-138
lines changed

src/bulk/common.ts

Lines changed: 84 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import { promisify } from 'util';
2-
31
import { type BSONSerializeOptions, type Document, EJSON, resolveBSONOptions } from '../bson';
42
import type { Collection } from '../collection';
53
import {
64
type AnyError,
75
MongoBatchReExecutionError,
86
MONGODB_ERROR_CODES,
97
MongoInvalidArgumentError,
8+
MongoRuntimeError,
109
MongoServerError,
1110
MongoWriteConcernError
1211
} from '../error';
@@ -22,7 +21,6 @@ import type { Topology } from '../sdam/topology';
2221
import type { ClientSession } from '../sessions';
2322
import {
2423
applyRetryableWrites,
25-
type Callback,
2624
getTopology,
2725
hasAtomicOperators,
2826
maybeAddIdToDocuments,
@@ -500,86 +498,46 @@ export function mergeBatchResults(
500498
}
501499
}
502500

503-
function executeCommands(
501+
async function executeCommands(
504502
bulkOperation: BulkOperationBase,
505-
options: BulkWriteOptions,
506-
callback: Callback<BulkWriteResult>
507-
) {
503+
options: BulkWriteOptions
504+
): Promise<BulkWriteResult> {
508505
if (bulkOperation.s.batches.length === 0) {
509-
return callback(
510-
undefined,
511-
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
512-
);
506+
return new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
513507
}
514508

515-
const batch = bulkOperation.s.batches.shift() as Batch;
509+
for (const batch of bulkOperation.s.batches) {
510+
const finalOptions = resolveOptions(bulkOperation, {
511+
...options,
512+
ordered: bulkOperation.isOrdered
513+
});
516514

517-
function resultHandler(err?: AnyError, result?: Document) {
518-
// Error is a driver related error not a bulk op error, return early
519-
if (err && 'message' in err && !(err instanceof MongoWriteConcernError)) {
520-
return callback(
521-
new MongoBulkWriteError(
522-
err,
523-
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
524-
)
525-
);
515+
if (finalOptions.bypassDocumentValidation !== true) {
516+
delete finalOptions.bypassDocumentValidation;
526517
}
527518

528-
if (err instanceof MongoWriteConcernError) {
529-
return handleMongoWriteConcernError(
530-
batch,
531-
bulkOperation.s.bulkResult,
532-
bulkOperation.isOrdered,
533-
err,
534-
callback
535-
);
519+
// Is the bypassDocumentValidation options specific
520+
if (bulkOperation.s.bypassDocumentValidation === true) {
521+
finalOptions.bypassDocumentValidation = true;
536522
}
537523

538-
// Merge the results together
539-
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, result);
540-
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
541-
if (bulkOperation.handleWriteError(callback, writeResult)) return;
542-
543-
// Execute the next command in line
544-
executeCommands(bulkOperation, options, callback);
545-
}
546-
547-
const finalOptions = resolveOptions(bulkOperation, {
548-
...options,
549-
ordered: bulkOperation.isOrdered
550-
});
551-
552-
if (finalOptions.bypassDocumentValidation !== true) {
553-
delete finalOptions.bypassDocumentValidation;
554-
}
555-
556-
// Set an operationIf if provided
557-
if (bulkOperation.operationId) {
558-
resultHandler.operationId = bulkOperation.operationId;
559-
}
560-
561-
// Is the bypassDocumentValidation options specific
562-
if (bulkOperation.s.bypassDocumentValidation === true) {
563-
finalOptions.bypassDocumentValidation = true;
564-
}
565-
566-
// Is the checkKeys option disabled
567-
if (bulkOperation.s.checkKeys === false) {
568-
finalOptions.checkKeys = false;
569-
}
570-
571-
if (finalOptions.retryWrites) {
572-
if (isUpdateBatch(batch)) {
573-
finalOptions.retryWrites = finalOptions.retryWrites && !batch.operations.some(op => op.multi);
524+
// Is the checkKeys option disabled
525+
if (bulkOperation.s.checkKeys === false) {
526+
finalOptions.checkKeys = false;
574527
}
575528

576-
if (isDeleteBatch(batch)) {
577-
finalOptions.retryWrites =
578-
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
529+
if (finalOptions.retryWrites) {
530+
if (isUpdateBatch(batch)) {
531+
finalOptions.retryWrites =
532+
finalOptions.retryWrites && !batch.operations.some(op => op.multi);
533+
}
534+
535+
if (isDeleteBatch(batch)) {
536+
finalOptions.retryWrites =
537+
finalOptions.retryWrites && !batch.operations.some(op => op.limit === 0);
538+
}
579539
}
580-
}
581540

582-
try {
583541
const operation = isInsertBatch(batch)
584542
? new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
585543
: isUpdateBatch(batch)
@@ -588,38 +546,61 @@ function executeCommands(
588546
? new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions)
589547
: null;
590548

591-
if (operation != null) {
592-
executeOperation(bulkOperation.s.collection.client, operation).then(
593-
result => resultHandler(undefined, result),
594-
error => resultHandler(error)
595-
);
549+
if (operation == null) throw new MongoRuntimeError(`Unknown batchType: ${batch.batchType}`);
550+
551+
let thrownError = null;
552+
let result;
553+
try {
554+
result = await executeOperation(bulkOperation.s.collection.client, operation);
555+
} catch (error) {
556+
thrownError = error;
557+
}
558+
559+
if (thrownError != null) {
560+
if (!(thrownError instanceof MongoWriteConcernError)) {
561+
// Error is a driver related error not a bulk op error, return early
562+
throw new MongoBulkWriteError(
563+
thrownError,
564+
new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered)
565+
);
566+
}
567+
568+
if (thrownError instanceof MongoWriteConcernError) {
569+
handleMongoWriteConcernError(
570+
batch,
571+
bulkOperation.s.bulkResult,
572+
bulkOperation.isOrdered,
573+
thrownError
574+
);
575+
}
596576
}
597-
} catch (err) {
598-
// Force top level error
599-
err.ok = 0;
600-
// Merge top level error and return
601-
mergeBatchResults(batch, bulkOperation.s.bulkResult, err, undefined);
602-
callback();
577+
578+
mergeBatchResults(batch, bulkOperation.s.bulkResult, thrownError, result);
579+
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
580+
bulkOperation.handleWriteError(writeResult);
603581
}
582+
583+
bulkOperation.s.batches.length = 0;
584+
585+
const writeResult = new BulkWriteResult(bulkOperation.s.bulkResult, bulkOperation.isOrdered);
586+
bulkOperation.handleWriteError(writeResult);
587+
return writeResult;
604588
}
605589

606590
function handleMongoWriteConcernError(
607591
batch: Batch,
608592
bulkResult: BulkResult,
609593
isOrdered: boolean,
610-
err: MongoWriteConcernError,
611-
callback: Callback<BulkWriteResult>
612-
) {
594+
err: MongoWriteConcernError
595+
): never {
613596
mergeBatchResults(batch, bulkResult, undefined, err.result);
614597

615-
callback(
616-
new MongoBulkWriteError(
617-
{
618-
message: err.result.writeConcernError.errmsg,
619-
code: err.result.writeConcernError.code
620-
},
621-
new BulkWriteResult(bulkResult, isOrdered)
622-
)
598+
throw new MongoBulkWriteError(
599+
{
600+
message: err.result.writeConcernError.errmsg,
601+
code: err.result.writeConcernError.code
602+
},
603+
new BulkWriteResult(bulkResult, isOrdered)
623604
);
624605
}
625606

@@ -875,8 +856,6 @@ export interface BulkWriteOptions extends CommandOperationOptions {
875856
let?: Document;
876857
}
877858

878-
const executeCommandsAsync = promisify(executeCommands);
879-
880859
/**
881860
* TODO(NODE-4063)
882861
* BulkWrites merge complexity is implemented in executeCommands
@@ -895,15 +874,15 @@ export class BulkWriteShimOperation extends AbstractOperation {
895874
return 'bulkWrite' as const;
896875
}
897876

898-
execute(_server: Server, session: ClientSession | undefined): Promise<any> {
877+
async execute(_server: Server, session: ClientSession | undefined): Promise<any> {
899878
if (this.options.session == null) {
900879
// An implicit session could have been created by 'executeOperation'
901880
// So if we stick it on finalOptions here, each bulk operation
902881
// will use this same session, it'll be passed in the same way
903882
// an explicit session would be
904883
this.options.session = session;
905884
}
906-
return executeCommandsAsync(this.bulkOperation, this.options);
885+
return await executeCommands(this.bulkOperation, this.options);
907886
}
908887
}
909888

@@ -1239,33 +1218,26 @@ export abstract class BulkOperationBase {
12391218
* Handles the write error before executing commands
12401219
* @internal
12411220
*/
1242-
handleWriteError(callback: Callback<BulkWriteResult>, writeResult: BulkWriteResult): boolean {
1221+
handleWriteError(writeResult: BulkWriteResult): void {
12431222
if (this.s.bulkResult.writeErrors.length > 0) {
12441223
const msg = this.s.bulkResult.writeErrors[0].errmsg
12451224
? this.s.bulkResult.writeErrors[0].errmsg
12461225
: 'write operation failed';
12471226

1248-
callback(
1249-
new MongoBulkWriteError(
1250-
{
1251-
message: msg,
1252-
code: this.s.bulkResult.writeErrors[0].code,
1253-
writeErrors: this.s.bulkResult.writeErrors
1254-
},
1255-
writeResult
1256-
)
1227+
throw new MongoBulkWriteError(
1228+
{
1229+
message: msg,
1230+
code: this.s.bulkResult.writeErrors[0].code,
1231+
writeErrors: this.s.bulkResult.writeErrors
1232+
},
1233+
writeResult
12571234
);
1258-
1259-
return true;
12601235
}
12611236

12621237
const writeConcernError = writeResult.getWriteConcernError();
12631238
if (writeConcernError) {
1264-
callback(new MongoBulkWriteError(writeConcernError, writeResult));
1265-
return true;
1239+
throw new MongoBulkWriteError(writeConcernError, writeResult);
12661240
}
1267-
1268-
return false;
12691241
}
12701242

12711243
abstract addToOperationsList(

src/bulk/unordered.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import type { Collection } from '../collection';
44
import { MongoInvalidArgumentError } from '../error';
55
import type { DeleteStatement } from '../operations/delete';
66
import type { UpdateStatement } from '../operations/update';
7-
import { type Callback } from '../utils';
87
import {
98
Batch,
109
BatchType,
@@ -20,12 +19,12 @@ export class UnorderedBulkOperation extends BulkOperationBase {
2019
super(collection, options, false);
2120
}
2221

23-
override handleWriteError(callback: Callback, writeResult: BulkWriteResult): boolean {
22+
override handleWriteError(writeResult: BulkWriteResult): void {
2423
if (this.s.batches.length) {
25-
return false;
24+
return;
2625
}
2726

28-
return super.handleWriteError(callback, writeResult);
27+
return super.handleWriteError(writeResult);
2928
}
3029

3130
addToOperationsList(

test/integration/crud/crud_api.test.ts

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,32 +1093,24 @@ describe('CRUD API', function () {
10931093
}
10941094
});
10951095

1096-
it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', {
1097-
// Add a tag that our runner can trigger on
1098-
// in this case we are setting that node needs to be higher than 0.10.X to run
1099-
metadata: {
1100-
requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }
1101-
},
1102-
1103-
test: async function () {
1104-
const ops = [];
1105-
// Create a set of operations that go over the 1000 limit causing two messages
1106-
let i = 0;
1107-
for (; i < 1005; i++) {
1108-
ops.push({ insertOne: { _id: i, a: i } });
1109-
}
1096+
it('should correctly throw error on illegal callback when unordered bulkWrite encounters error', async function () {
1097+
const ops = [];
1098+
// Create a set of operations that go over the 1000 limit causing two messages
1099+
let i = 0;
1100+
for (; i < 1005; i++) {
1101+
ops.push({ insertOne: { _id: i, a: i } });
1102+
}
11101103

1111-
ops.push({ insertOne: { _id: 0, a: i } });
1104+
ops.push({ insertOne: { _id: 0, a: i } });
11121105

1113-
const db = client.db();
1106+
const db = client.db();
11141107

1115-
const error = await db
1116-
.collection('t20_1')
1117-
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
1118-
.catch(error => error);
1108+
const error = await db
1109+
.collection('t20_1')
1110+
.bulkWrite(ops, { ordered: false, writeConcern: { w: 1 } })
1111+
.catch(error => error);
11191112

1120-
expect(error).to.be.instanceOf(MongoError);
1121-
}
1113+
expect(error).to.be.instanceOf(MongoError);
11221114
});
11231115

11241116
it('should correctly throw error on illegal callback when ordered bulkWrite encounters error', {

0 commit comments

Comments
 (0)