Skip to content

Commit a2eccf7

Browse files
committed
fix(NODE-5502): recursive calls to next cause memory leak
1 parent 2323ca8 commit a2eccf7

File tree

1 file changed

+80
-89
lines changed

1 file changed

+80
-89
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 80 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ export type AbstractCursorEvents = {
129129
[AbstractCursor.CLOSE](): void;
130130
};
131131

132+
export interface AbstractCursor {
133+
/** @internal */
134+
_getMoreAsync: (batchSize: number) => Promise<Document>;
135+
/** @internal */
136+
kInit: () => Promise<void>;
137+
}
138+
132139
/** @public */
133140
export abstract class AbstractCursor<
134141
TSchema = any,
@@ -220,6 +227,11 @@ export abstract class AbstractCursor<
220227
return this[kId] ?? undefined;
221228
}
222229

230+
/** @internal */
231+
get isDead() {
232+
return this[kId]?.isZero() ?? true;
233+
}
234+
223235
/** @internal */
224236
get client(): MongoClient {
225237
return this[kClient];
@@ -632,25 +644,17 @@ export abstract class AbstractCursor<
632644
* operation. We cannot refactor to use the abstract _initialize method without
633645
* a significant refactor.
634646
*/
635-
[kInit](callback: Callback<TSchema | null>): void {
647+
[kInit](callback: Callback<void>): void {
636648
this._initialize(this[kSession], (error, state) => {
637649
if (state) {
638650
const response = state.response;
639651
this[kServer] = state.server;
640652

641653
if (response.cursor) {
642-
// TODO(NODE-2674): Preserve int64 sent from MongoDB
643-
this[kId] =
644-
typeof response.cursor.id === 'number'
645-
? Long.fromNumber(response.cursor.id)
646-
: typeof response.cursor.id === 'bigint'
647-
? Long.fromBigInt(response.cursor.id)
648-
: response.cursor.id;
649-
654+
this[kId] = getCursorId(response);
650655
if (response.cursor.ns) {
651656
this[kNamespace] = ns(response.cursor.ns);
652657
}
653-
654658
this[kDocuments].pushMany(response.cursor.firstBatch);
655659
}
656660

@@ -671,7 +675,7 @@ export abstract class AbstractCursor<
671675
return cleanupCursor(this, { error }, () => callback(error, undefined));
672676
}
673677

674-
if (cursorIsDead(this)) {
678+
if (this.isDead) {
675679
return cleanupCursor(this, undefined, () => callback());
676680
}
677681

@@ -680,6 +684,22 @@ export abstract class AbstractCursor<
680684
}
681685
}
682686

687+
// @ts-expect-error: Callback mutual exclusion type issue
688+
AbstractCursor.prototype._getMoreAsync = promisify(function (
689+
this: AbstractCursor,
690+
batchSize: number,
691+
callback: Callback<Document>
692+
) {
693+
return this._getMore(batchSize, callback);
694+
});
695+
696+
AbstractCursor.prototype.kInit = promisify(function (
697+
this: AbstractCursor,
698+
callback: Callback<void>
699+
) {
700+
return this[kInit](callback);
701+
});
702+
683703
/**
684704
* @param cursor - the cursor on which to call `next`
685705
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
@@ -701,96 +721,58 @@ async function next<T>(
701721
transform: boolean;
702722
}
703723
): Promise<T | null> {
704-
const cursorId = cursor[kId];
705724
if (cursor.closed) {
706725
return null;
707726
}
708727

709-
if (cursor[kDocuments].length !== 0) {
710-
const doc = cursor[kDocuments].shift();
711-
712-
if (doc != null && transform && cursor[kTransform]) {
713-
try {
714-
return cursor[kTransform](doc);
715-
} catch (error) {
716-
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
717-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718-
// error instead.
719-
});
720-
throw error;
721-
}
722-
}
723-
724-
return doc;
728+
if (cursor[kId] == null) {
729+
await cursor.kInit();
725730
}
726731

727-
if (cursorId == null) {
728-
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
729-
const init = promisify(cb => cursor[kInit](cb));
730-
await init();
731-
return next(cursor, { blocking, transform });
732-
}
732+
do {
733+
if (cursor[kDocuments].length !== 0) {
734+
const doc = cursor[kDocuments].shift();
733735

734-
if (cursorIsDead(cursor)) {
735-
// if the cursor is dead, we clean it up
736-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737-
// and we should surface the error
738-
await cleanupCursorAsync(cursor, {});
739-
return null;
740-
}
736+
if (doc != null && transform && cursor[kTransform]) {
737+
try {
738+
return cursor[kTransform](doc);
739+
} catch (error) {
740+
// `cleanupCursorAsync` should never throw
741+
// but if it does we want to throw the original error instead.
742+
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
743+
throw error;
744+
}
745+
}
741746

742-
// otherwise need to call getMore
743-
const batchSize = cursor[kOptions].batchSize || 1000;
744-
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
745-
cursor._getMore(batchSize, cb)
746-
);
747-
748-
let response: Document | undefined;
749-
try {
750-
response = await getMore(batchSize);
751-
} catch (error) {
752-
if (error) {
753-
await cleanupCursorAsync(cursor, { error }).catch(() => {
754-
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755-
// error instead.
756-
});
757-
throw error;
747+
return doc;
758748
}
759-
}
760-
761-
if (response) {
762-
const cursorId =
763-
typeof response.cursor.id === 'number'
764-
? Long.fromNumber(response.cursor.id)
765-
: typeof response.cursor.id === 'bigint'
766-
? Long.fromBigInt(response.cursor.id)
767-
: response.cursor.id;
768-
769-
cursor[kDocuments].pushMany(response.cursor.nextBatch);
770-
cursor[kId] = cursorId;
771-
}
772-
773-
if (cursorIsDead(cursor)) {
774-
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775-
// we intentionally clean up the cursor to release its session back into the pool before the cursor
776-
// is iterated. This prevents a cursor that is exhausted on the server from holding
777-
// onto a session indefinitely until the AbstractCursor is iterated.
778-
//
779-
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780-
// and we should surface the error
781-
await cleanupCursorAsync(cursor, {});
782-
}
783749

784-
if (cursor[kDocuments].length === 0 && blocking === false) {
785-
return null;
786-
}
750+
if (cursor.isDead) {
751+
// if the cursor is dead, we clean it up
752+
await cleanupCursorAsync(cursor, {});
753+
return null;
754+
}
787755

788-
return next(cursor, { blocking, transform });
789-
}
756+
try {
757+
const { batchSize = 1000 } = cursor[kOptions];
758+
const response = await cursor._getMoreAsync(batchSize);
759+
cursor[kId] = getCursorId(response);
760+
cursor[kDocuments].pushMany(response.cursor.nextBatch);
761+
if (cursor[kDocuments].length === 0 && blocking === false) {
762+
return null;
763+
}
764+
if (cursor.isDead) {
765+
// Received more documents to return but the cursor is done server side.
766+
// So release the session as early as possible.
767+
await cleanupCursorAsync(cursor, {});
768+
}
769+
} catch (error) {
770+
await cleanupCursorAsync(cursor, { error }).catch(() => null);
771+
throw error;
772+
}
773+
} while (!cursor.isDead || cursor[kDocuments].length !== 0);
790774

791-
function cursorIsDead(cursor: AbstractCursor): boolean {
792-
const cursorId = cursor[kId];
793-
return !!cursorId && cursorId.isZero();
775+
return null;
794776
}
795777

796778
const cleanupCursorAsync = promisify(cleanupCursor);
@@ -955,3 +937,12 @@ class ReadableCursorStream extends Readable {
955937
);
956938
}
957939
}
940+
941+
function getCursorId(response: { cursor: { id: number | bigint | Long } } | Document) {
942+
// TODO(NODE-2674): Preserve int64 sent from MongoDB
943+
return typeof response.cursor.id === 'number'
944+
? Long.fromNumber(response.cursor.id)
945+
: typeof response.cursor.id === 'bigint'
946+
? Long.fromBigInt(response.cursor.id)
947+
: response.cursor.id;
948+
}

0 commit comments

Comments
 (0)