Skip to content

Commit 2a65d43

Browse files
authored
refactor(NODE-6201): cursor to use fetchBatch when current batch is empty (#4093)
1 parent aa429f8 commit 2a65d43

File tree

9 files changed

+230
-276
lines changed

9 files changed

+230
-276
lines changed

src/cursor/abstract_cursor.ts

Lines changed: 170 additions & 246 deletions
Large diffs are not rendered by default.

src/cursor/aggregation_cursor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import type { Sort } from '../sort';
88
import type { MongoDBNamespace } from '../utils';
99
import { mergeOptions } from '../utils';
1010
import type { AbstractCursorOptions } from './abstract_cursor';
11-
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
11+
import { AbstractCursor } from './abstract_cursor';
1212

1313
/** @public */
1414
export interface AggregationCursorOptions extends AbstractCursorOptions, AggregateOptions {}
@@ -101,7 +101,7 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
101101
addStage(stage: Document): this;
102102
addStage<T = Document>(stage: Document): AggregationCursor<T>;
103103
addStage<T = Document>(stage: Document): AggregationCursor<T> {
104-
assertUninitialized(this);
104+
this.throwIfInitialized();
105105
this[kPipeline].push(stage);
106106
return this as unknown as AggregationCursor<T>;
107107
}

src/cursor/find_cursor.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import type { Hint } from '../operations/operation';
1111
import type { ClientSession } from '../sessions';
1212
import { formatSort, type Sort, type SortDirection } from '../sort';
1313
import { emitWarningOnce, mergeOptions, type MongoDBNamespace, squashError } from '../utils';
14-
import { AbstractCursor, assertUninitialized } from './abstract_cursor';
14+
import { AbstractCursor } from './abstract_cursor';
1515

1616
/** @internal */
1717
const kFilter = Symbol('filter');
@@ -163,7 +163,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
163163

164164
/** Set the cursor query */
165165
filter(filter: Document): this {
166-
assertUninitialized(this);
166+
this.throwIfInitialized();
167167
this[kFilter] = filter;
168168
return this;
169169
}
@@ -174,7 +174,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
174174
* @param hint - If specified, then the query system will only consider plans using the hinted index.
175175
*/
176176
hint(hint: Hint): this {
177-
assertUninitialized(this);
177+
this.throwIfInitialized();
178178
this[kBuiltOptions].hint = hint;
179179
return this;
180180
}
@@ -185,7 +185,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
185185
* @param min - Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
186186
*/
187187
min(min: Document): this {
188-
assertUninitialized(this);
188+
this.throwIfInitialized();
189189
this[kBuiltOptions].min = min;
190190
return this;
191191
}
@@ -196,7 +196,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
196196
* @param max - Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
197197
*/
198198
max(max: Document): this {
199-
assertUninitialized(this);
199+
this.throwIfInitialized();
200200
this[kBuiltOptions].max = max;
201201
return this;
202202
}
@@ -209,7 +209,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
209209
* @param value - the returnKey value.
210210
*/
211211
returnKey(value: boolean): this {
212-
assertUninitialized(this);
212+
this.throwIfInitialized();
213213
this[kBuiltOptions].returnKey = value;
214214
return this;
215215
}
@@ -220,7 +220,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
220220
* @param value - The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
221221
*/
222222
showRecordId(value: boolean): this {
223-
assertUninitialized(this);
223+
this.throwIfInitialized();
224224
this[kBuiltOptions].showRecordId = value;
225225
return this;
226226
}
@@ -232,7 +232,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
232232
* @param value - The modifier value.
233233
*/
234234
addQueryModifier(name: string, value: string | boolean | number | Document): this {
235-
assertUninitialized(this);
235+
this.throwIfInitialized();
236236
if (name[0] !== '$') {
237237
throw new MongoInvalidArgumentError(`${name} is not a valid query modifier`);
238238
}
@@ -295,7 +295,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
295295
* @param value - The comment attached to this query.
296296
*/
297297
comment(value: string): this {
298-
assertUninitialized(this);
298+
this.throwIfInitialized();
299299
this[kBuiltOptions].comment = value;
300300
return this;
301301
}
@@ -306,7 +306,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
306306
* @param value - Number of milliseconds to wait before aborting the tailed query.
307307
*/
308308
maxAwaitTimeMS(value: number): this {
309-
assertUninitialized(this);
309+
this.throwIfInitialized();
310310
if (typeof value !== 'number') {
311311
throw new MongoInvalidArgumentError('Argument for maxAwaitTimeMS must be a number');
312312
}
@@ -321,7 +321,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
321321
* @param value - Number of milliseconds to wait before aborting the query.
322322
*/
323323
override maxTimeMS(value: number): this {
324-
assertUninitialized(this);
324+
this.throwIfInitialized();
325325
if (typeof value !== 'number') {
326326
throw new MongoInvalidArgumentError('Argument for maxTimeMS must be a number');
327327
}
@@ -371,7 +371,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
371371
* ```
372372
*/
373373
project<T extends Document = Document>(value: Document): FindCursor<T> {
374-
assertUninitialized(this);
374+
this.throwIfInitialized();
375375
this[kBuiltOptions].projection = value;
376376
return this as unknown as FindCursor<T>;
377377
}
@@ -383,7 +383,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
383383
* @param direction - The direction of the sorting (1 or -1).
384384
*/
385385
sort(sort: Sort | string, direction?: SortDirection): this {
386-
assertUninitialized(this);
386+
this.throwIfInitialized();
387387
if (this[kBuiltOptions].tailable) {
388388
throw new MongoTailableCursorError('Tailable cursor does not support sorting');
389389
}
@@ -399,7 +399,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
399399
* {@link https://www.mongodb.com/docs/manual/reference/command/find/#find-cmd-allowdiskuse | find command allowDiskUse documentation}
400400
*/
401401
allowDiskUse(allow = true): this {
402-
assertUninitialized(this);
402+
this.throwIfInitialized();
403403

404404
if (!this[kBuiltOptions].sort) {
405405
throw new MongoInvalidArgumentError('Option "allowDiskUse" requires a sort specification');
@@ -421,7 +421,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
421421
* @param value - The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
422422
*/
423423
collation(value: CollationOptions): this {
424-
assertUninitialized(this);
424+
this.throwIfInitialized();
425425
this[kBuiltOptions].collation = value;
426426
return this;
427427
}
@@ -432,7 +432,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
432432
* @param value - The limit for the cursor query.
433433
*/
434434
limit(value: number): this {
435-
assertUninitialized(this);
435+
this.throwIfInitialized();
436436
if (this[kBuiltOptions].tailable) {
437437
throw new MongoTailableCursorError('Tailable cursor does not support limit');
438438
}
@@ -451,7 +451,7 @@ export class FindCursor<TSchema = any> extends AbstractCursor<TSchema> {
451451
* @param value - The skip for the cursor query.
452452
*/
453453
skip(value: number): this {
454-
assertUninitialized(this);
454+
this.throwIfInitialized();
455455
if (this[kBuiltOptions].tailable) {
456456
throw new MongoTailableCursorError('Tailable cursor does not support skip');
457457
}

test/integration/change-streams/change_stream.test.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { PassThrough } from 'stream';
77
import { setTimeout } from 'timers';
88

99
import {
10-
AbstractCursor,
1110
type ChangeStream,
1211
type ChangeStreamOptions,
1312
type Collection,
@@ -33,9 +32,9 @@ import {
3332
import { delay, filterForCommands } from '../shared';
3433

3534
const initIteratorMode = async (cs: ChangeStream) => {
36-
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
3735
const initEvent = once(cs.cursor, 'init');
38-
await cs.cursor[kInit]();
36+
//@ts-expect-error: private method
37+
await cs.cursor.cursorInit();
3938
await initEvent;
4039
return;
4140
};

test/integration/change-streams/change_streams.prose.test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import * as sinon from 'sinon';
44
import { setTimeout } from 'timers';
55

66
import {
7-
AbstractCursor,
87
type ChangeStream,
98
type CommandFailedEvent,
109
type CommandStartedEvent,
@@ -18,7 +17,6 @@ import {
1817
Timestamp
1918
} from '../../mongodb';
2019
import * as mock from '../../tools/mongodb-mock/index';
21-
import { getSymbolFrom } from '../../tools/utils';
2220
import { setupDatabase } from '../shared';
2321

2422
/**
@@ -72,9 +70,9 @@ function triggerResumableError(
7270
}
7371

7472
const initIteratorMode = async (cs: ChangeStream) => {
75-
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
7673
const initEvent = once(cs.cursor, 'init');
77-
await cs.cursor[kInit]();
74+
//@ts-expect-error: private method
75+
await cs.cursor.cursorInit();
7876
await initEvent;
7977
return;
8078
};

test/integration/crud/misc_cursors.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1870,7 +1870,7 @@ describe('Cursor', function () {
18701870
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);
18711871

18721872
await client.close();
1873-
expect(cursor).to.have.property('killed', true);
1873+
expect(cursor).to.have.property('closed', true);
18741874

18751875
const error = await rejectedEarlyBecauseClientClosed;
18761876
expect(error).to.be.instanceOf(MongoExpiredSessionError);

test/integration/node-specific/abstract_cursor.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,26 @@ describe('class AbstractCursor', function () {
153153
await client.close();
154154
});
155155

156+
it('wraps transform in result checking for each map call', async () => {
157+
const control = { functionThatShouldReturnNull: 0 };
158+
const makeCursor = () => {
159+
const cursor = collection.find();
160+
cursor
161+
.map(doc => (control.functionThatShouldReturnNull === 0 ? null : doc))
162+
.map(doc => (control.functionThatShouldReturnNull === 1 ? null : doc))
163+
.map(doc => (control.functionThatShouldReturnNull === 2 ? null : doc));
164+
return cursor;
165+
};
166+
167+
for (const testFn of [0, 1, 2]) {
168+
control.functionThatShouldReturnNull = testFn;
169+
const error = await makeCursor()
170+
.toArray()
171+
.catch(error => error);
172+
expect(error).to.be.instanceOf(MongoAPIError);
173+
}
174+
});
175+
156176
context('toArray() with custom transforms', function () {
157177
for (const value of falseyValues) {
158178
it(`supports mapping to falsey value '${inspect(value)}'`, async function () {

test/integration/node-specific/cursor_async_iterator.test.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ describe('Cursor Async Iterator Tests', function () {
7272
}
7373
});
7474

75+
it('should not iterate if closed immediately', async function () {
76+
const cursor = collection.find();
77+
await cursor.close();
78+
79+
let count = 0;
80+
// eslint-disable-next-line no-unused-vars
81+
for await (const _ of cursor) count++;
82+
83+
expect(count).to.equal(0);
84+
expect(cursor.closed).to.be.true;
85+
});
86+
7587
it('should properly stop when cursor is closed', async function () {
7688
const cursor = collection.find();
7789

test/tools/unified-spec-runner/operations.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { AssertionError, expect } from 'chai';
66

77
import {
88
AbstractCursor,
9+
type ChangeStream,
910
Collection,
1011
CommandStartedEvent,
1112
Db,
@@ -240,9 +241,9 @@ operations.set('createChangeStream', async ({ entities, operation }) => {
240241
}
241242

242243
const { pipeline, ...args } = operation.arguments!;
243-
const changeStream = watchable.watch(pipeline, args);
244-
const kInit = getSymbolFrom(AbstractCursor.prototype, 'kInit');
245-
await changeStream.cursor[kInit]();
244+
const changeStream: ChangeStream = watchable.watch(pipeline, args);
245+
//@ts-expect-error: private method
246+
await changeStream.cursor.cursorInit();
246247
return changeStream;
247248
});
248249

0 commit comments

Comments
 (0)