Skip to content

refactor(NODE-5379): cursor internals to use async-await #3804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 67 additions & 74 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Readable, Transform } from 'stream';
import { promisify } from 'util';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import {
Expand All @@ -21,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { type Callback, List, type MongoDBNamespace, ns } from '../utils';
import { List, type MongoDBNamespace, ns } from '../utils';

/** @internal */
const kId = Symbol('id');
Expand Down Expand Up @@ -310,7 +309,7 @@ export abstract class AbstractCursor<
const message =
'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.';

await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
await cleanupCursor(this, { needsToEmitClosed: true }).catch(() => null);

throw new MongoAPIError(message);
}
Expand Down Expand Up @@ -419,7 +418,7 @@ export abstract class AbstractCursor<
async close(): Promise<void> {
const needsToEmitClosed = !this[kClosed];
this[kClosed] = true;
await cleanupCursorAsync(this, { needsToEmitClosed });
await cleanupCursor(this, { needsToEmitClosed });
}

/**
Expand Down Expand Up @@ -613,21 +612,18 @@ export abstract class AbstractCursor<
abstract clone(): AbstractCursor<TSchema>;

/** @internal */
protected abstract _initialize(
session: ClientSession | undefined,
callback: Callback<ExecutionResult>
): void;
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;

/** @internal */
_getMore(batchSize: number, callback: Callback<Document>): void {
async getMore(batchSize: number): Promise<Document | null> {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
...this[kOptions],
session: this[kSession],
batchSize
});

executeOperation(this[kClient], getMoreOperation, callback);
return executeOperation(this[kClient], getMoreOperation);
}

/**
Expand All @@ -637,51 +633,50 @@ export abstract class AbstractCursor<
* operation. We cannot refactor to use the abstract _initialize method without
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
this._initialize(this[kSession], (error, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;

if (response.cursor) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

if (response.cursor.ns) {
this[kNamespace] = ns(response.cursor.ns);
}
async [kInit](): Promise<void> {
try {
const state = await this._initialize(this[kSession]);
const response = state.response;
this[kServer] = state.server;
if (response.cursor) {
// TODO(NODE-2674): Preserve int64 sent from MongoDB
this[kId] =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

this[kDocuments].pushMany(response.cursor.firstBatch);
if (response.cursor.ns) {
this[kNamespace] = ns(response.cursor.ns);
}

// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (this[kId] == null) {
this[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
this[kDocuments].push(state.response as TODO_NODE_3286);
}
this[kDocuments].pushMany(response.cursor.firstBatch);
}

// the cursor is now initialized, even if an error occurred or it is dead
this[kInitialized] = true;

if (error) {
return cleanupCursor(this, { error }, () => callback(error, undefined));
// When server responses return without a cursor document, we close this cursor
// and return the raw server response. This is often the case for explain commands
// for example
if (this[kId] == null) {
this[kId] = Long.ZERO;
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
this[kDocuments].push(state.response as TODO_NODE_3286);
}

if (this.isDead) {
return cleanupCursor(this, undefined, () => callback());
}
// the cursor is now initialized, even if it is dead
this[kInitialized] = true;
} catch (error) {
// the cursor is now initialized, even if an error occurred
this[kInitialized] = true;
await cleanupCursor(this, { error });
throw error;
}

callback();
});
if (this.isDead) {
await cleanupCursor(this, undefined);
}

return;
}
}

Expand Down Expand Up @@ -713,7 +708,7 @@ async function next<T>(
do {
if (cursor[kId] == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
await promisify(cursor[kInit].bind(cursor))();
await cursor[kInit]();
}

if (cursor[kDocuments].length !== 0) {
Expand All @@ -725,7 +720,7 @@ async function next<T>(
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
await cleanupCursor(cursor, { error, needsToEmitClosed: true }).catch(() => null);
throw error;
}
}
Expand All @@ -737,15 +732,15 @@ async function next<T>(
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
await cleanupCursor(cursor, {});
return null;
}

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;

try {
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
const response = await cursor.getMore(batchSize);

if (response) {
const cursorId =
Expand All @@ -761,7 +756,7 @@ async function next<T>(
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error }).catch(() => null);
await cleanupCursor(cursor, { error }).catch(() => null);
throw error;
}

Expand All @@ -773,7 +768,7 @@ async function next<T>(
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
await cleanupCursor(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
Expand All @@ -784,13 +779,10 @@ async function next<T>(
return null;
}

const cleanupCursorAsync = promisify(cleanupCursor);

function cleanupCursor(
async function cleanupCursor(
cursor: AbstractCursor,
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined,
callback: Callback
): void {
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined
): Promise<void> {
const cursorId = cursor[kId];
const cursorNs = cursor[kNamespace];
const server = cursor[kServer];
Expand All @@ -817,9 +809,7 @@ function cleanupCursor(

if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
callback();
});
await session.endSession({ error });
return;
}

Expand All @@ -828,16 +818,17 @@ function cleanupCursor(
}
}

return callback();
return;
}

function completeCleanup() {
async function completeCleanup() {
if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
try {
await session.endSession({ error });
} finally {
cursor.emit(AbstractCursor.CLOSE);
callback();
});
}
return;
}

Expand All @@ -847,7 +838,7 @@ function cleanupCursor(
}

cursor.emit(AbstractCursor.CLOSE);
return callback();
return;
}

cursor[kKilled] = true;
Expand All @@ -856,12 +847,14 @@ function cleanupCursor(
return completeCleanup();
}

executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
)
.catch(() => null)
.finally(completeCleanup);
try {
await executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
).catch(() => null);
} finally {
await completeCleanup();
}
}

/** @internal */
Expand Down
12 changes: 5 additions & 7 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AggregateOperation, type AggregateOptions } from '../operations/aggrega
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import type { Sort } from '../sort';
import type { Callback, MongoDBNamespace } from '../utils';
import type { MongoDBNamespace } from '../utils';
import { mergeOptions } from '../utils';
import type { AbstractCursorOptions } from './abstract_cursor';
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
Expand Down Expand Up @@ -61,19 +61,17 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
}

/** @internal */
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
async _initialize(session: ClientSession): Promise<ExecutionResult> {
const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
...this[kOptions],
...this.cursorOptions,
session
});

executeOperation(this.client, aggregateOperation, (err, response) => {
if (err || response == null) return callback(err);
const response = await executeOperation(this.client, aggregateOperation);

// TODO: NODE-2882
callback(undefined, { server: aggregateOperation.server, session, response });
});
// TODO: NODE-2882
return { server: aggregateOperation.server, session, response };
}

/** Execute the explain for the cursor */
Expand Down
Loading