Skip to content

fix(NODE-5588): recursive calls to next cause memory leak #3841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 65 additions & 74 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ export abstract class AbstractCursor<
return this[kId] ?? undefined;
}

/** @internal */
get isDead() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes much more sense to me to be a property of the cursor itself instead of passing a cursor to a function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we intentionally change the semantics of isDead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I see that now in the description. But I'm still unsure why the logic was changed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to exit the loop and enter clean-up logic at the correct times. The prior logic didn't account for a cursor already being closed, or killed, so the loop would attempt a getMore on a killed cursor id. This same "exit" condition applies to all the places the cursorIsDead helper was being used.

return (this[kId]?.isZero() ?? false) || this[kClosed] || this[kKilled];
}

/** @internal */
get client(): MongoClient {
return this[kClient];
Expand Down Expand Up @@ -671,7 +676,7 @@ export abstract class AbstractCursor<
return cleanupCursor(this, { error }, () => callback(error, undefined));
}

if (cursorIsDead(this)) {
if (this.isDead) {
return cleanupCursor(this, undefined, () => callback());
}

Expand Down Expand Up @@ -701,96 +706,82 @@ async function next<T>(
transform: boolean;
}
): Promise<T | null> {
const cursorId = cursor[kId];
if (cursor.closed) {
return null;
}

if (cursor[kDocuments].length !== 0) {
const doc = cursor[kDocuments].shift();
do {
if (cursor[kId] == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
await promisify(cursor[kInit].bind(cursor))();
}

if (cursor[kDocuments].length !== 0) {
const doc = cursor[kDocuments].shift();

if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => {
if (doc != null && transform && cursor[kTransform]) {
try {
return cursor[kTransform](doc);
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
throw error;
await cleanupCursorAsync(cursor, { error, needsToEmitClosed: true }).catch(() => null);
throw error;
}
}
}

return doc;
}
return doc;
}

if (cursorId == null) {
// All cursors must operate within a session, one must be made implicitly if not explicitly provided
const init = promisify(cb => cursor[kInit](cb));
await init();
return next(cursor, { blocking, transform });
}
if (cursor.isDead) {
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}

if (cursorIsDead(cursor)) {
// if the cursor is dead, we clean it up
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
return null;
}
// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;

// otherwise need to call getMore
const batchSize = cursor[kOptions].batchSize || 1000;
const getMore = promisify((batchSize: number, cb: Callback<Document | undefined>) =>
cursor._getMore(batchSize, cb)
);

let response: Document | undefined;
try {
response = await getMore(batchSize);
} catch (error) {
if (error) {
await cleanupCursorAsync(cursor, { error }).catch(() => {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
});
try {
const response = await promisify(cursor._getMore.bind(cursor))(batchSize);

if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}
} catch (error) {
// `cleanupCursorAsync` should never throw, but if it does we want to throw the original
// error instead.
await cleanupCursorAsync(cursor, { error }).catch(() => null);
throw error;
}
}

if (response) {
const cursorId =
typeof response.cursor.id === 'number'
? Long.fromNumber(response.cursor.id)
: typeof response.cursor.id === 'bigint'
? Long.fromBigInt(response.cursor.id)
: response.cursor.id;

cursor[kDocuments].pushMany(response.cursor.nextBatch);
cursor[kId] = cursorId;
}

if (cursorIsDead(cursor)) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}
if (cursor.isDead) {
// If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
// we intentionally clean up the cursor to release its session back into the pool before the cursor
// is iterated. This prevents a cursor that is exhausted on the server from holding
// onto a session indefinitely until the AbstractCursor is iterated.
//
// cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
// and we should surface the error
await cleanupCursorAsync(cursor, {});
}

return next(cursor, { blocking, transform });
}
if (cursor[kDocuments].length === 0 && blocking === false) {
return null;
}
} while (!cursor.isDead || cursor[kDocuments].length !== 0);

function cursorIsDead(cursor: AbstractCursor): boolean {
const cursorId = cursor[kId];
return !!cursorId && cursorId.isZero();
return null;
}

const cleanupCursorAsync = promisify(cleanupCursor);
Expand Down
6 changes: 4 additions & 2 deletions src/cursor/find_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Document } from '../bson';
import { type Document, Long } from '../bson';
import { MongoInvalidArgumentError, MongoTailableCursorError } from '../error';
import type { ExplainVerbosityLike } from '../explain';
import type { MongoClient } from '../mongo_client';
Expand Down Expand Up @@ -101,7 +101,9 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
limit && limit > 0 && numReturned + batchSize > limit ? limit - numReturned : batchSize;

if (batchSize <= 0) {
this.close().finally(() => callback());
this.close().finally(() =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change just for consistency in return from _getMore or is there a reason it's needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, it is for result consistency, I may be able to revert it if I make sure the logic is nullish safe in next but the contract should have been either throw or result and when we make _getMore actually async that will be the case for the promise's resolution type.

This code path also specifically claims it is for legacy servers which we don't support, I'm fairly certain this is only triggered by mock server tests.

callback(undefined, { cursor: { id: Long.ZERO, nextBatch: [] } })
);
return;
}
}
Expand Down