Skip to content

refactor(NODE-5471): refactor crud operations to use async/await syntax #3777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, ObjectId, resolveBSONOptions } from '../bson';
import type { Collection } from '../collection';
import {
Expand All @@ -13,7 +15,7 @@ import type { CollationOptions, CommandOperationOptions } from '../operations/co
import { DeleteOperation, type DeleteStatement, makeDeleteStatement } from '../operations/delete';
import { executeOperation } from '../operations/execute_operation';
import { InsertOperation } from '../operations/insert';
import { AbstractCallbackOperation, type Hint } from '../operations/operation';
import { AbstractOperation, type Hint } from '../operations/operation';
import { makeUpdateStatement, UpdateOperation, type UpdateStatement } from '../operations/update';
import type { Server } from '../sdam/server';
import type { Topology } from '../sdam/topology';
Expand Down Expand Up @@ -828,33 +830,31 @@ export interface BulkWriteOptions extends CommandOperationOptions {
let?: Document;
}

const executeCommandsAsync = promisify(executeCommands);

/**
* TODO(NODE-4063)
* BulkWrites merge complexity is implemented in executeCommands
* This provides a vehicle to treat bulkOperations like any other operation (hence "shim")
* We would like this logic to simply live inside the BulkWriteOperation class
* @internal
*/
class BulkWriteShimOperation extends AbstractCallbackOperation {
class BulkWriteShimOperation extends AbstractOperation {
bulkOperation: BulkOperationBase;
constructor(bulkOperation: BulkOperationBase, options: BulkWriteOptions) {
super(options);
this.bulkOperation = bulkOperation;
}

executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<any>
): void {
execute(_server: Server, session: ClientSession | undefined): Promise<any> {
if (this.options.session == null) {
// An implicit session could have been created by 'executeOperation'
// So if we stick it on finalOptions here, each bulk operation
// will use this same session, it'll be passed in the same way
// an explicit session would be
this.options.session = session;
}
return executeCommands(this.bulkOperation, this.options, callback);
return executeCommandsAsync(this.bulkOperation, this.options);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/bulk/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { Collection } from '../collection';
import { MongoInvalidArgumentError } from '../error';
import type { DeleteStatement } from '../operations/delete';
import type { UpdateStatement } from '../operations/update';
import type { Callback } from '../utils';
import { type Callback } from '../utils';
import {
Batch,
BatchType,
Expand Down
25 changes: 13 additions & 12 deletions src/operations/aggregate.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import type { Document } from '../bson';
import { MongoInvalidArgumentError } from '../error';
import { type TODO_NODE_3286 } from '../mongo_types';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Callback, maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import {
type CollationOptions,
CommandCallbackOperation,
type CommandOperationOptions
} from './command';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';

/** @internal */
Expand Down Expand Up @@ -40,7 +37,7 @@ export interface AggregateOptions extends CommandOperationOptions {
}

/** @internal */
export class AggregateOperation<T = Document> extends CommandCallbackOperation<T> {
export class AggregateOperation<T = Document> extends CommandOperation<T> {
override options: AggregateOptions;
target: string | typeof DB_AGGREGATE_COLLECTION;
pipeline: Document[];
Expand Down Expand Up @@ -93,11 +90,7 @@ export class AggregateOperation<T = Document> extends CommandCallbackOperation<T
this.pipeline.push(stage);
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<T>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<T> {
const options: AggregateOptions = this.options;
const serverWireVersion = maxWireVersion(server);
const command: Document = { aggregate: this.target, pipeline: this.pipeline };
Expand Down Expand Up @@ -137,7 +130,15 @@ export class AggregateOperation<T = Document> extends CommandCallbackOperation<T
command.cursor.batchSize = options.batchSize;
}

super.executeCommandCallback(server, session, command, callback);
return super.executeCommand(server, session, command) as TODO_NODE_3286;
}

protected override executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<T>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
26 changes: 9 additions & 17 deletions src/operations/bulk_write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import type {
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AbstractCallbackOperation, Aspect, defineAspects } from './operation';
import { AbstractOperation, Aspect, defineAspects } from './operation';

/** @internal */
export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResult> {
export class BulkWriteOperation extends AbstractOperation<BulkWriteResult> {
override options: BulkWriteOptions;
collection: Collection;
operations: AnyBulkWriteOperation[];
Expand All @@ -27,11 +26,10 @@ export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResul
this.operations = operations;
}

override executeCallback(
override async execute(
server: Server,
session: ClientSession | undefined,
callback: Callback<BulkWriteResult>
): void {
session: ClientSession | undefined
): Promise<BulkWriteResult> {
const coll = this.collection;
const operations = this.operations;
const options = { ...this.options, ...this.bsonOptions, readPreference: this.readPreference };
Expand All @@ -43,19 +41,13 @@ export class BulkWriteOperation extends AbstractCallbackOperation<BulkWriteResul
: coll.initializeOrderedBulkOp(options);

// for each op go through and add to the bulk
try {
for (let i = 0; i < operations.length; i++) {
bulk.raw(operations[i]);
}
} catch (err) {
return callback(err);
for (let i = 0; i < operations.length; i++) {
bulk.raw(operations[i]);
}

// Execute the bulk
bulk.execute({ ...options, session }).then(
result => callback(undefined, result),
error => callback(error)
);
const result = await bulk.execute({ ...options, session });
return result;
}
}

Expand Down
23 changes: 13 additions & 10 deletions src/operations/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback, MongoDBNamespace } from '../utils';
import { CommandCallbackOperation, type CommandOperationOptions } from './command';
import { CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects } from './operation';

/** @public */
Expand All @@ -19,7 +19,7 @@ export interface CountOptions extends CommandOperationOptions {
}

/** @internal */
export class CountOperation extends CommandCallbackOperation<number> {
export class CountOperation extends CommandOperation<number> {
override options: CountOptions;
collectionName?: string;
query: Document;
Expand All @@ -32,11 +32,7 @@ export class CountOperation extends CommandCallbackOperation<number> {
this.query = filter;
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
const options = this.options;
const cmd: Document = {
count: this.collectionName,
Expand All @@ -59,9 +55,16 @@ export class CountOperation extends CommandCallbackOperation<number> {
cmd.maxTimeMS = options.maxTimeMS;
}

super.executeCommandCallback(server, session, cmd, (err, result) => {
callback(err, result ? result.n : 0);
});
const result = await super.executeCommand(server, session, cmd);
return result ? result.n : 0;
}

protected override executeCallback(
_server: Server,
_session: ClientSession | undefined,
_callback: Callback<number>
): void {
throw new Error('Method not implemented.');
}
}

Expand Down
33 changes: 11 additions & 22 deletions src/operations/count_documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Document } from '../bson';
import type { Collection } from '../collection';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import type { Callback } from '../utils';
import { AggregateOperation, type AggregateOptions } from './aggregate';

/** @public */
Expand Down Expand Up @@ -32,26 +31,16 @@ export class CountDocumentsOperation extends AggregateOperation<number> {
super(collection.s.namespace, pipeline, options);
}

override executeCallback(
server: Server,
session: ClientSession | undefined,
callback: Callback<number>
): void {
super.executeCallback(server, session, (err, result) => {
if (err || !result) {
callback(err);
return;
}

// NOTE: We're avoiding creating a cursor here to reduce the callstack.
const response = result as unknown as Document;
if (response.cursor == null || response.cursor.firstBatch == null) {
callback(undefined, 0);
return;
}

const docs = response.cursor.firstBatch;
callback(undefined, docs.length ? docs[0].n : 0);
});
override async execute(server: Server, session: ClientSession | undefined): Promise<number> {
const result = await super.execute(server, session);

// NOTE: We're avoiding creating a cursor here to reduce the callstack.
const response = result as unknown as Document;
if (response.cursor == null || response.cursor.firstBatch == null) {
return 0;
}

const docs = response.cursor.firstBatch;
return docs.length ? docs[0].n : 0;
}
}
Loading