Skip to content

Commit 3cec15c

Browse files
nbbeekenbaileympearson
authored andcommitted
chore(NODE-6397): change node-latest task to use latest node version (#4213)
1 parent 98ecbd5 commit 3cec15c

File tree

16 files changed

+1042
-287
lines changed

16 files changed

+1042
-287
lines changed

src/cmap/commands.ts

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,60 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
// Document sequences starts with type 1 at the first byte.
448+
// Field strings must always be UTF-8.
449+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
450+
buffer[0] = 1;
451+
// Third part is the field name at offset 5 with trailing null byte.
452+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
453+
this.chunks.push(buffer);
454+
this.header = buffer;
455+
if (documents) {
456+
for (const doc of documents) {
457+
this.push(doc, BSON.serialize(doc));
458+
}
459+
}
460+
}
461+
462+
/**
463+
* Push a document to the document sequence. Will serialize the document
464+
* as well and return the current serialized length of all documents.
465+
* @param document - The document to add.
466+
* @param buffer - The serialized document in raw BSON.
467+
* @returns The new total document sequence length.
468+
*/
469+
push(document: Document, buffer: Uint8Array): number {
470+
this.serializedDocumentsLength += buffer.length;
471+
// Push the document.
472+
this.documents.push(document);
473+
// Push the document raw bson.
474+
this.chunks.push(buffer);
475+
// Write the new length.
476+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
477+
return this.serializedDocumentsLength + this.header.length;
478+
}
479+
480+
/**
481+
* Get the fully serialized bytes for the document sequence section.
482+
* @returns The section bytes.
483+
*/
484+
toBin(): Uint8Array {
485+
return Buffer.concat(this.chunks);
436486
}
437487
}
438488

@@ -543,21 +593,7 @@ export class OpMsgRequest {
543593
const chunks = [];
544594
for (const [key, value] of Object.entries(document)) {
545595
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
596+
chunks.push(value.toBin());
561597
// Why are we removing the field from the command? This is because it needs to be
562598
// removed in the OP_MSG request first section, and DocumentSequence is not a
563599
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/cursor/client_bulk_write_cursor.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import type { Document } from '../bson';
1+
import { type Document } from 'bson';
2+
23
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { MongoBulkWriteCursorError } from '../error';
4+
import { MongoClientBulkWriteCursorError } from '../error';
45
import type { MongoClient } from '../mongo_client';
56
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
7+
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
68
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
79
import { executeOperation } from '../operations/execute_operation';
810
import type { ClientSession } from '../sessions';
@@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions
2426
* @internal
2527
*/
2628
export class ClientBulkWriteCursor extends AbstractCursor {
27-
public readonly command: Document;
29+
commandBuilder: ClientBulkWriteCommandBuilder;
2830
/** @internal */
2931
private cursorResponse?: ClientBulkWriteCursorResponse;
3032
/** @internal */
3133
private clientBulkWriteOptions: ClientBulkWriteOptions;
3234

3335
/** @internal */
34-
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
36+
constructor(
37+
client: MongoClient,
38+
commandBuilder: ClientBulkWriteCommandBuilder,
39+
options: ClientBulkWriteOptions = {}
40+
) {
3541
super(client, new MongoDBNamespace('admin', '$cmd'), options);
3642

37-
this.command = command;
43+
this.commandBuilder = commandBuilder;
3844
this.clientBulkWriteOptions = options;
3945
}
4046

@@ -44,22 +50,29 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4450
*/
4551
get response(): ClientBulkWriteCursorResponse {
4652
if (this.cursorResponse) return this.cursorResponse;
47-
throw new MongoBulkWriteCursorError(
53+
throw new MongoClientBulkWriteCursorError(
4854
'No client bulk write cursor response returned from the server.'
4955
);
5056
}
5157

58+
/**
59+
* Get the last set of operations the cursor executed.
60+
*/
61+
get operations(): Document[] {
62+
return this.commandBuilder.lastOperations;
63+
}
64+
5265
clone(): ClientBulkWriteCursor {
5366
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
5467
delete clonedOptions.session;
55-
return new ClientBulkWriteCursor(this.client, this.command, {
68+
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
5669
...clonedOptions
5770
});
5871
}
5972

6073
/** @internal */
6174
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
62-
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
75+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
6376
...this.clientBulkWriteOptions,
6477
...this.cursorOptions,
6578
session

src/error.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ export class MongoGCPError extends MongoOIDCError {
625625
* @public
626626
* @category Error
627627
*/
628-
export class MongoBulkWriteCursorError extends MongoRuntimeError {
628+
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
629629
/**
630630
* **Do not use this constructor!**
631631
*
@@ -642,7 +642,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
642642
}
643643

644644
override get name(): string {
645-
return 'MongoBulkWriteCursorError';
645+
return 'MongoClientBulkWriteCursorError';
646+
}
647+
}
648+
649+
/**
650+
* An error indicating that an error occurred on the client when executing a client bulk write.
651+
*
652+
* @public
653+
* @category Error
654+
*/
655+
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
656+
/**
657+
* **Do not use this constructor!**
658+
*
659+
* Meant for internal use only.
660+
*
661+
* @remarks
662+
* This class is only meant to be constructed within the driver. This constructor is
663+
* not subject to semantic versioning compatibility guarantees and may change at any time.
664+
*
665+
* @public
666+
**/
667+
constructor(message: string) {
668+
super(message);
669+
}
670+
671+
override get name(): string {
672+
return 'MongoClientBulkWriteExecutionError';
646673
}
647674
}
648675

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ export {
4444
MongoAWSError,
4545
MongoAzureError,
4646
MongoBatchReExecutionError,
47-
MongoBulkWriteCursorError,
4847
MongoChangeStreamError,
48+
MongoClientBulkWriteCursorError,
49+
MongoClientBulkWriteExecutionError,
4950
MongoCompatibilityError,
5051
MongoCursorExhaustedError,
5152
MongoCursorInUseError,

src/operations/client_bulk_write/client_bulk_write.ts

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,29 @@
1-
import { type Document } from 'bson';
2-
1+
import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
32
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
43
import type { Server } from '../../sdam/server';
54
import type { ClientSession } from '../../sessions';
65
import { type TimeoutContext } from '../../timeout';
76
import { MongoDBNamespace } from '../../utils';
87
import { CommandOperation } from '../command';
98
import { Aspect, defineAspects } from '../operation';
9+
import { type ClientBulkWriteCommandBuilder } from './command_builder';
1010
import { type ClientBulkWriteOptions } from './common';
1111

1212
/**
1313
* Executes a single client bulk write operation within a potential batch.
1414
* @internal
1515
*/
1616
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
17-
command: Document;
17+
commandBuilder: ClientBulkWriteCommandBuilder;
1818
override options: ClientBulkWriteOptions;
1919

2020
override get commandName() {
2121
return 'bulkWrite' as const;
2222
}
2323

24-
constructor(command: Document, options: ClientBulkWriteOptions) {
24+
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
2525
super(undefined, options);
26-
this.command = command;
26+
this.commandBuilder = commandBuilder;
2727
this.options = options;
2828
this.ns = new MongoDBNamespace('admin', '$cmd');
2929
}
@@ -39,15 +39,46 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
3939
session: ClientSession | undefined,
4040
timeoutContext: TimeoutContext
4141
): Promise<ClientBulkWriteCursorResponse> {
42+
let command;
43+
44+
if (server.description.type === ServerType.LoadBalancer) {
45+
if (session) {
46+
// Checkout a connection to build the command.
47+
const connection = await server.pool.checkOut({ timeoutContext });
48+
// Pin the connection to the session so it get used to execute the command and we do not
49+
// perform a double check-in/check-out.
50+
session.pin(connection);
51+
command = this.commandBuilder.buildBatch(
52+
connection.hello?.maxMessageSizeBytes,
53+
connection.hello?.maxWriteBatchSize
54+
);
55+
} else {
56+
throw new MongoClientBulkWriteExecutionError(
57+
'Session provided to the client bulk write operation must be present.'
58+
);
59+
}
60+
} else {
61+
// At this point we have a server and the auto connect code has already
62+
// run in executeOperation, so the server description will be populated.
63+
// We can use that to build the command.
64+
command = this.commandBuilder.buildBatch(
65+
server.description.maxMessageSizeBytes,
66+
server.description.maxWriteBatchSize
67+
);
68+
}
4269
return await super.executeCommand(
4370
server,
4471
session,
45-
this.command,
72+
command,
4673
timeoutContext,
4774
ClientBulkWriteCursorResponse
4875
);
4976
}
5077
}
5178

5279
// Skipping the collation as it goes on the individual ops.
53-
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
80+
defineAspects(ClientBulkWriteOperation, [
81+
Aspect.WRITE_OPERATION,
82+
Aspect.SKIP_COLLATION,
83+
Aspect.CURSOR_CREATING
84+
]);

0 commit comments

Comments
 (0)