Skip to content

Commit b1c0eac

Browse files
refactor(NODE-5379): cursor internals to use async-await (#3804)
Co-authored-by: Bailey Pearson <[email protected]>
1 parent 2a3de19 commit b1c0eac

File tree

11 files changed

+186
-234
lines changed

11 files changed

+186
-234
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 67 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { Readable, Transform } from 'stream';
2-
import { promisify } from 'util';
32

43
import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
54
import {
@@ -21,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
2120
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2221
import type { Server } from '../sdam/server';
2322
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { type Callback, List, type MongoDBNamespace, ns } from '../utils';
23+
import { List, type MongoDBNamespace, ns } from '../utils';
2524

2625
/** @internal */
2726
const kId = Symbol('id');
@@ -310,7 +309,7 @@ export abstract class AbstractCursor<
310309
const message =
311310
'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.';
312311

313-
await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
312+
await cleanupCursor(this, { needsToEmitClosed: true }).catch(() => null);
314313

315314
throw new MongoAPIError(message);
316315
}
@@ -419,7 +418,7 @@ export abstract class AbstractCursor<
419418
async close(): Promise<void> {
420419
const needsToEmitClosed = !this[kClosed];
421420
this[kClosed] = true;
422-
await cleanupCursorAsync(this, { needsToEmitClosed });
421+
await cleanupCursor(this, { needsToEmitClosed });
423422
}
424423

425424
/**
@@ -613,21 +612,18 @@ export abstract class AbstractCursor<
613612
abstract clone(): AbstractCursor<TSchema>;
614613

615614
/** @internal */
616-
protected abstract _initialize(
617-
session: ClientSession | undefined,
618-
callback: Callback<ExecutionResult>
619-
): void;
615+
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;
620616

621617
/** @internal */
622-
_getMore(batchSize: number, callback: Callback<Document>): void {
618+
async getMore(batchSize: number): Promise<Document | null> {
623619
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
624620
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
625621
...this[kOptions],
626622
session: this[kSession],
627623
batchSize
628624
});
629625

630-
executeOperation(this[kClient], getMoreOperation, callback);
626+
return executeOperation(this[kClient], getMoreOperation);
631627
}
632628

633629
/**
@@ -637,51 +633,50 @@ export abstract class AbstractCursor<
637633
* operation. We cannot refactor to use the abstract _initialize method without
638634
* a significant refactor.
639635
*/
640-
[kInit](callback: Callback<TSchema | null>): void {
641-
this._initialize(this[kSession], (error, state) => {
642-
if (state) {
643-
const response = state.response;
644-
this[kServer] = state.server;
645-
646-
if (response.cursor) {
647-
// TODO(NODE-2674): Preserve int64 sent from MongoDB
648-
this[kId] =
649-
typeof response.cursor.id === 'number'
650-
? Long.fromNumber(response.cursor.id)
651-
: typeof response.cursor.id === 'bigint'
652-
? Long.fromBigInt(response.cursor.id)
653-
: response.cursor.id;
654-
655-
if (response.cursor.ns) {
656-
this[kNamespace] = ns(response.cursor.ns);
657-
}
636+
async [kInit](): Promise<void> {
637+
try {
638+
const state = await this._initialize(this[kSession]);
639+
const response = state.response;
640+
this[kServer] = state.server;
641+
if (response.cursor) {
642+
// TODO(NODE-2674): Preserve int64 sent from MongoDB
643+
this[kId] =
644+
typeof response.cursor.id === 'number'
645+
? Long.fromNumber(response.cursor.id)
646+
: typeof response.cursor.id === 'bigint'
647+
? Long.fromBigInt(response.cursor.id)
648+
: response.cursor.id;
658649

659-
this[kDocuments].pushMany(response.cursor.firstBatch);
650+
if (response.cursor.ns) {
651+
this[kNamespace] = ns(response.cursor.ns);
660652
}
661653

662-
// When server responses return without a cursor document, we close this cursor
663-
// and return the raw server response. This is often the case for explain commands
664-
// for example
665-
if (this[kId] == null) {
666-
this[kId] = Long.ZERO;
667-
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
668-
this[kDocuments].push(state.response as TODO_NODE_3286);
669-
}
654+
this[kDocuments].pushMany(response.cursor.firstBatch);
670655
}
671656

672-
// the cursor is now initialized, even if an error occurred or it is dead
673-
this[kInitialized] = true;
674-
675-
if (error) {
676-
return cleanupCursor(this, { error }, () => callback(error, undefined));
657+
// When server responses return without a cursor document, we close this cursor
658+
// and return the raw server response. This is often the case for explain commands
659+
// for example
660+
if (this[kId] == null) {
661+
this[kId] = Long.ZERO;
662+
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
663+
this[kDocuments].push(state.response as TODO_NODE_3286);
677664
}
678665

679-
if (this.isDead) {
680-
return cleanupCursor(this, undefined, () => callback());
681-
}
666+
// the cursor is now initialized, even if it is dead
667+
this[kInitialized] = true;
668+
} catch (error) {
669+
// the cursor is now initialized, even if an error occurred
670+
this[kInitialized] = true;
671+
await cleanupCursor(this, { error });
672+
throw error;
673+
}
682674

683-
callback();
684-
});
675+
if (this.isDead) {
676+
await cleanupCursor(this, undefined);
677+
}
678+
679+
return;
685680
}
686681
}
687682

@@ -713,7 +708,7 @@ async function next<T>(
713708
do {
714709
if (cursor[kId] == null) {
715710
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
716-
await promisify(cursor[kInit].bind(cursor))();
711+
await cursor[kInit]();
717712
}
718713

719714
if (cursor[kDocuments].length !== 0) {
@@ -725,7 +720,7 @@ async function next<T>(
725720
} catch (error) {
726721
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
727722
// error instead.
728-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
723+
await cleanupCursor(cursor, { error, needsToEmitClosed: true }).catch(() => null);
729724
throw error;
730725
}
731726
}
@@ -737,15 +732,15 @@ async function next<T>(
737732
// if the cursor is dead, we clean it up
738733
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
739734
// and we should surface the error
740-
await cleanupCursorAsync(cursor, {});
735+
await cleanupCursor(cursor, {});
741736
return null;
742737
}
743738

744739
// otherwise need to call getMore
745740
const batchSize = cursor[kOptions].batchSize || 1000;
746741

747742
try {
748-
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
743+
const response = await cursor.getMore(batchSize);
749744

750745
if (response) {
751746
const cursorId =
@@ -761,7 +756,7 @@ async function next<T>(
761756
} catch (error) {
762757
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
763758
// error instead.
764-
await cleanupCursorAsync(cursor, { error }).catch(() => null);
759+
await cleanupCursor(cursor, { error }).catch(() => null);
765760
throw error;
766761
}
767762

@@ -773,7 +768,7 @@ async function next<T>(
773768
//
774769
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
775770
// and we should surface the error
776-
await cleanupCursorAsync(cursor, {});
771+
await cleanupCursor(cursor, {});
777772
}
778773

779774
if (cursor[kDocuments].length === 0 && blocking === false) {
@@ -784,13 +779,10 @@ async function next<T>(
784779
return null;
785780
}
786781

787-
const cleanupCursorAsync = promisify(cleanupCursor);
788-
789-
function cleanupCursor(
782+
async function cleanupCursor(
790783
cursor: AbstractCursor,
791-
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined,
792-
callback: Callback
793-
): void {
784+
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined
785+
): Promise<void> {
794786
const cursorId = cursor[kId];
795787
const cursorNs = cursor[kNamespace];
796788
const server = cursor[kServer];
@@ -817,9 +809,7 @@ function cleanupCursor(
817809

818810
if (session) {
819811
if (session.owner === cursor) {
820-
session.endSession({ error }).finally(() => {
821-
callback();
822-
});
812+
await session.endSession({ error });
823813
return;
824814
}
825815

@@ -828,16 +818,17 @@ function cleanupCursor(
828818
}
829819
}
830820

831-
return callback();
821+
return;
832822
}
833823

834-
function completeCleanup() {
824+
async function completeCleanup() {
835825
if (session) {
836826
if (session.owner === cursor) {
837-
session.endSession({ error }).finally(() => {
827+
try {
828+
await session.endSession({ error });
829+
} finally {
838830
cursor.emit(AbstractCursor.CLOSE);
839-
callback();
840-
});
831+
}
841832
return;
842833
}
843834

@@ -847,7 +838,7 @@ function cleanupCursor(
847838
}
848839

849840
cursor.emit(AbstractCursor.CLOSE);
850-
return callback();
841+
return;
851842
}
852843

853844
cursor[kKilled] = true;
@@ -856,12 +847,14 @@ function cleanupCursor(
856847
return completeCleanup();
857848
}
858849

859-
executeOperation(
860-
cursor[kClient],
861-
new KillCursorsOperation(cursorId, cursorNs, server, { session })
862-
)
863-
.catch(() => null)
864-
.finally(completeCleanup);
850+
try {
851+
await executeOperation(
852+
cursor[kClient],
853+
new KillCursorsOperation(cursorId, cursorNs, server, { session })
854+
).catch(() => null);
855+
} finally {
856+
await completeCleanup();
857+
}
865858
}
866859

867860
/** @internal */

src/cursor/aggregation_cursor.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { AggregateOperation, type AggregateOptions } from '../operations/aggrega
55
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
66
import type { ClientSession } from '../sessions';
77
import type { Sort } from '../sort';
8-
import type { Callback, MongoDBNamespace } from '../utils';
8+
import type { MongoDBNamespace } from '../utils';
99
import { mergeOptions } from '../utils';
1010
import type { AbstractCursorOptions } from './abstract_cursor';
1111
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
@@ -61,19 +61,17 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
6161
}
6262

6363
/** @internal */
64-
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
64+
async _initialize(session: ClientSession): Promise<ExecutionResult> {
6565
const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
6666
...this[kOptions],
6767
...this.cursorOptions,
6868
session
6969
});
7070

71-
executeOperation(this.client, aggregateOperation, (err, response) => {
72-
if (err || response == null) return callback(err);
71+
const response = await executeOperation(this.client, aggregateOperation);
7372

74-
// TODO: NODE-2882
75-
callback(undefined, { server: aggregateOperation.server, session, response });
76-
});
73+
// TODO: NODE-2882
74+
return { server: aggregateOperation.server, session, response };
7775
}
7876

7977
/** Execute the explain for the cursor */

0 commit comments

Comments
 (0)