Skip to content

Commit 785ff2e

Browse files
refactor cursor internals to async-await
1 parent 177a4fc commit 785ff2e

File tree

10 files changed

+175
-230
lines changed

10 files changed

+175
-230
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 64 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { Readable, Transform } from 'stream';
2-
import { promisify } from 'util';
32

43
import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
54
import {
@@ -21,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
2120
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
2221
import type { Server } from '../sdam/server';
2322
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
24-
import { type Callback, List, type MongoDBNamespace, ns } from '../utils';
23+
import { List, type MongoDBNamespace, ns } from '../utils';
2524

2625
/** @internal */
2726
const kId = Symbol('id');
@@ -310,7 +309,7 @@ export abstract class AbstractCursor<
310309
const message =
311310
'Cursor returned a `null` document, but the cursor is not exhausted. Mapping documents to `null` is not supported in the cursor transform.';
312311

313-
await cleanupCursorAsync(this, { needsToEmitClosed: true }).catch(() => null);
312+
await cleanupCursor(this, { needsToEmitClosed: true }).catch(() => null);
314313

315314
throw new MongoAPIError(message);
316315
}
@@ -419,7 +418,7 @@ export abstract class AbstractCursor<
419418
async close(): Promise<void> {
420419
const needsToEmitClosed = !this[kClosed];
421420
this[kClosed] = true;
422-
await cleanupCursorAsync(this, { needsToEmitClosed });
421+
await cleanupCursor(this, { needsToEmitClosed });
423422
}
424423

425424
/**
@@ -613,21 +612,18 @@ export abstract class AbstractCursor<
613612
abstract clone(): AbstractCursor<TSchema>;
614613

615614
/** @internal */
616-
protected abstract _initialize(
617-
session: ClientSession | undefined,
618-
callback: Callback<ExecutionResult>
619-
): void;
615+
protected abstract _initialize(session: ClientSession | undefined): Promise<ExecutionResult>;
620616

621617
/** @internal */
622-
_getMore(batchSize: number, callback: Callback<Document>): void {
618+
async getMore(batchSize: number): Promise<Document | null> {
623619
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
624620
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
625621
...this[kOptions],
626622
session: this[kSession],
627623
batchSize
628624
});
629625

630-
executeOperation(this[kClient], getMoreOperation, callback);
626+
return executeOperation(this[kClient], getMoreOperation);
631627
}
632628

633629
/**
@@ -637,51 +633,48 @@ export abstract class AbstractCursor<
637633
* operation. We cannot refactor to use the abstract _initialize method without
638634
* a significant refactor.
639635
*/
640-
[kInit](callback: Callback<TSchema | null>): void {
641-
this._initialize(this[kSession], (error, state) => {
642-
if (state) {
643-
const response = state.response;
644-
this[kServer] = state.server;
645-
646-
if (response.cursor) {
647-
// TODO(NODE-2674): Preserve int64 sent from MongoDB
648-
this[kId] =
649-
typeof response.cursor.id === 'number'
650-
? Long.fromNumber(response.cursor.id)
651-
: typeof response.cursor.id === 'bigint'
652-
? Long.fromBigInt(response.cursor.id)
653-
: response.cursor.id;
654-
655-
if (response.cursor.ns) {
656-
this[kNamespace] = ns(response.cursor.ns);
657-
}
636+
async [kInit](): Promise<void> {
637+
try {
638+
const state = await this._initialize(this[kSession]);
639+
const response = state.response;
640+
this[kServer] = state.server;
641+
if (response.cursor) {
642+
// TODO(NODE-2674): Preserve int64 sent from MongoDB
643+
this[kId] =
644+
typeof response.cursor.id === 'number'
645+
? Long.fromNumber(response.cursor.id)
646+
: typeof response.cursor.id === 'bigint'
647+
? Long.fromBigInt(response.cursor.id)
648+
: response.cursor.id;
658649

659-
this[kDocuments].pushMany(response.cursor.firstBatch);
650+
if (response.cursor.ns) {
651+
this[kNamespace] = ns(response.cursor.ns);
660652
}
661653

662-
// When server responses return without a cursor document, we close this cursor
663-
// and return the raw server response. This is often the case for explain commands
664-
// for example
665-
if (this[kId] == null) {
666-
this[kId] = Long.ZERO;
667-
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
668-
this[kDocuments].push(state.response as TODO_NODE_3286);
669-
}
654+
this[kDocuments].pushMany(response.cursor.firstBatch);
655+
}
656+
657+
// When server responses return without a cursor document, we close this cursor
658+
// and return the raw server response. This is often the case for explain commands
659+
// for example
660+
if (this[kId] == null) {
661+
this[kId] = Long.ZERO;
662+
// TODO(NODE-3286): ExecutionResult needs to accept a generic parameter
663+
this[kDocuments].push(state.response as TODO_NODE_3286);
670664
}
671665

672666
// the cursor is now initialized, even if an error occurred or it is dead
673667
this[kInitialized] = true;
668+
} catch (error) {
669+
await cleanupCursor(this, { error });
670+
throw error;
671+
}
674672

675-
if (error) {
676-
return cleanupCursor(this, { error }, () => callback(error, undefined));
677-
}
678-
679-
if (this.isDead) {
680-
return cleanupCursor(this, undefined, () => callback());
681-
}
673+
if (this.isDead) {
674+
await cleanupCursor(this, undefined);
675+
}
682676

683-
callback();
684-
});
677+
return;
685678
}
686679
}
687680

@@ -713,7 +706,7 @@ async function next<T>(
713706
do {
714707
if (cursor[kId] == null) {
715708
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
716-
await promisify(cursor[kInit].bind(cursor))();
709+
await cursor[kInit]();
717710
}
718711

719712
if (cursor[kDocuments].length !== 0) {
@@ -725,7 +718,7 @@ async function next<T>(
725718
} catch (error) {
726719
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
727720
// error instead.
728-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
721+
await cleanupCursor(cursor, { error, needsToEmitClosed: true }).catch(() => null);
729722
throw error;
730723
}
731724
}
@@ -737,15 +730,15 @@ async function next<T>(
737730
// if the cursor is dead, we clean it up
738731
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
739732
// and we should surface the error
740-
await cleanupCursorAsync(cursor, {});
733+
await cleanupCursor(cursor, {});
741734
return null;
742735
}
743736

744737
// otherwise need to call getMore
745738
const batchSize = cursor[kOptions].batchSize || 1000;
746739

747740
try {
748-
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);
741+
const response = await cursor.getMore(batchSize);
749742

750743
if (response) {
751744
const cursorId =
@@ -761,7 +754,7 @@ async function next<T>(
761754
} catch (error) {
762755
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
763756
// error instead.
764-
await cleanupCursorAsync(cursor, { error }).catch(() => null);
757+
await cleanupCursor(cursor, { error }).catch(() => null);
765758
throw error;
766759
}
767760

@@ -773,7 +766,7 @@ async function next<T>(
773766
//
774767
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
775768
// and we should surface the error
776-
await cleanupCursorAsync(cursor, {});
769+
await cleanupCursor(cursor, {});
777770
}
778771

779772
if (cursor[kDocuments].length === 0 && blocking === false) {
@@ -784,13 +777,10 @@ async function next<T>(
784777
return null;
785778
}
786779

787-
const cleanupCursorAsync = promisify(cleanupCursor);
788-
789-
function cleanupCursor(
780+
async function cleanupCursor(
790781
cursor: AbstractCursor,
791-
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined,
792-
callback: Callback
793-
): void {
782+
options: { error?: AnyError | undefined; needsToEmitClosed?: boolean } | undefined
783+
): Promise<void> {
794784
const cursorId = cursor[kId];
795785
const cursorNs = cursor[kNamespace];
796786
const server = cursor[kServer];
@@ -817,9 +807,7 @@ function cleanupCursor(
817807

818808
if (session) {
819809
if (session.owner === cursor) {
820-
session.endSession({ error }).finally(() => {
821-
callback();
822-
});
810+
await session.endSession({ error });
823811
return;
824812
}
825813

@@ -828,16 +816,17 @@ function cleanupCursor(
828816
}
829817
}
830818

831-
return callback();
819+
return;
832820
}
833821

834-
function completeCleanup() {
822+
async function completeCleanup() {
835823
if (session) {
836824
if (session.owner === cursor) {
837-
session.endSession({ error }).finally(() => {
825+
try {
826+
await session.endSession({ error });
827+
} finally {
838828
cursor.emit(AbstractCursor.CLOSE);
839-
callback();
840-
});
829+
}
841830
return;
842831
}
843832

@@ -847,7 +836,7 @@ function cleanupCursor(
847836
}
848837

849838
cursor.emit(AbstractCursor.CLOSE);
850-
return callback();
839+
return;
851840
}
852841

853842
cursor[kKilled] = true;
@@ -856,12 +845,14 @@ function cleanupCursor(
856845
return completeCleanup();
857846
}
858847

859-
executeOperation(
860-
cursor[kClient],
861-
new KillCursorsOperation(cursorId, cursorNs, server, { session })
862-
)
863-
.catch(() => null)
864-
.finally(completeCleanup);
848+
try {
849+
await executeOperation(
850+
cursor[kClient],
851+
new KillCursorsOperation(cursorId, cursorNs, server, { session })
852+
).catch(() => null);
853+
} finally {
854+
await completeCleanup();
855+
}
865856
}
866857

867858
/** @internal */

src/cursor/aggregation_cursor.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { AggregateOperation, type AggregateOptions } from '../operations/aggrega
55
import { executeOperation, type ExecutionResult } from '../operations/execute_operation';
66
import type { ClientSession } from '../sessions';
77
import type { Sort } from '../sort';
8-
import type { Callback, MongoDBNamespace } from '../utils';
8+
import type { MongoDBNamespace } from '../utils';
99
import { mergeOptions } from '../utils';
1010
import type { AbstractCursorOptions } from './abstract_cursor';
1111
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
@@ -61,19 +61,17 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
6161
}
6262

6363
/** @internal */
64-
_initialize(session: ClientSession, callback: Callback<ExecutionResult>): void {
64+
async _initialize(session: ClientSession): Promise<ExecutionResult> {
6565
const aggregateOperation = new AggregateOperation(this.namespace, this[kPipeline], {
6666
...this[kOptions],
6767
...this.cursorOptions,
6868
session
6969
});
7070

71-
executeOperation(this.client, aggregateOperation, (err, response) => {
72-
if (err || response == null) return callback(err);
71+
const response = await executeOperation(this.client, aggregateOperation);
7372

74-
// TODO: NODE-2882
75-
callback(undefined, { server: aggregateOperation.server, session, response });
76-
});
73+
// TODO: NODE-2882
74+
return { server: aggregateOperation.server, session, response };
7775
}
7876

7977
/** Execute the explain for the cursor */

0 commit comments

Comments
 (0)