@@ -129,6 +129,13 @@ export type AbstractCursorEvents = {
129
129
[ AbstractCursor . CLOSE ] ( ) : void ;
130
130
} ;
131
131
132
+ export interface AbstractCursor {
133
+ /** @internal */
134
+ _getMoreAsync : ( batchSize : number ) => Promise < Document > ;
135
+ /** @internal */
136
+ kInit : ( ) => Promise < void > ;
137
+ }
138
+
132
139
/** @public */
133
140
export abstract class AbstractCursor <
134
141
TSchema = any ,
@@ -220,6 +227,14 @@ export abstract class AbstractCursor<
220
227
return this [ kId ] ?? undefined ;
221
228
}
222
229
230
+ /**
231
+ * @internal
232
+ * A cursor starts with a null id, a cursor that has not been started is not exhausted
233
+ */
234
+ get exhausted ( ) {
235
+ return this [ kId ] ?. isZero ( ) ?? false ;
236
+ }
237
+
223
238
/** @internal */
224
239
get client ( ) : MongoClient {
225
240
return this [ kClient ] ;
@@ -314,7 +329,7 @@ export abstract class AbstractCursor<
314
329
315
330
yield document ;
316
331
317
- if ( this [ kId ] === Long . ZERO ) {
332
+ if ( this . exhausted ) {
318
333
// Cursor exhausted
319
334
break ;
320
335
}
@@ -353,7 +368,7 @@ export abstract class AbstractCursor<
353
368
}
354
369
355
370
async hasNext ( ) : Promise < boolean > {
356
- if ( this [ kId ] === Long . ZERO ) {
371
+ if ( this . exhausted ) {
357
372
return false ;
358
373
}
359
374
@@ -373,7 +388,7 @@ export abstract class AbstractCursor<
373
388
374
389
/** Get the next available document from the cursor, returns null if no more documents are available. */
375
390
async next ( ) : Promise < TSchema | null > {
376
- if ( this [ kId ] === Long . ZERO ) {
391
+ if ( this . exhausted ) {
377
392
throw new MongoCursorExhaustedError ( ) ;
378
393
}
379
394
@@ -384,7 +399,7 @@ export abstract class AbstractCursor<
384
399
* Try to get the next available document from the cursor or `null` if an empty batch is returned
385
400
*/
386
401
async tryNext ( ) : Promise < TSchema | null > {
387
- if ( this [ kId ] === Long . ZERO ) {
402
+ if ( this . exhausted ) {
388
403
throw new MongoCursorExhaustedError ( ) ;
389
404
}
390
405
@@ -632,25 +647,17 @@ export abstract class AbstractCursor<
632
647
* operation. We cannot refactor to use the abstract _initialize method without
633
648
* a significant refactor.
634
649
*/
635
- [ kInit ] ( callback : Callback < TSchema | null > ) : void {
650
+ [ kInit ] ( callback : Callback < void > ) : void {
636
651
this . _initialize ( this [ kSession ] , ( error , state ) => {
637
652
if ( state ) {
638
653
const response = state . response ;
639
654
this [ kServer ] = state . server ;
640
655
641
656
if ( response . cursor ) {
642
- // TODO(NODE-2674): Preserve int64 sent from MongoDB
643
- this [ kId ] =
644
- typeof response . cursor . id === 'number'
645
- ? Long . fromNumber ( response . cursor . id )
646
- : typeof response . cursor . id === 'bigint'
647
- ? Long . fromBigInt ( response . cursor . id )
648
- : response . cursor . id ;
649
-
657
+ this [ kId ] = getCursorId ( response ) ;
650
658
if ( response . cursor . ns ) {
651
659
this [ kNamespace ] = ns ( response . cursor . ns ) ;
652
660
}
653
-
654
661
this [ kDocuments ] . pushMany ( response . cursor . firstBatch ) ;
655
662
}
656
663
@@ -671,7 +678,7 @@ export abstract class AbstractCursor<
671
678
return cleanupCursor ( this , { error } , ( ) => callback ( error , undefined ) ) ;
672
679
}
673
680
674
- if ( cursorIsDead ( this ) ) {
681
+ if ( this . exhausted ) {
675
682
return cleanupCursor ( this , undefined , ( ) => callback ( ) ) ;
676
683
}
677
684
@@ -680,6 +687,22 @@ export abstract class AbstractCursor<
680
687
}
681
688
}
682
689
690
+ // @ts -expect-error: Callback mutual exclusion type issue
691
+ AbstractCursor . prototype . _getMoreAsync = promisify ( function (
692
+ this : AbstractCursor ,
693
+ batchSize : number ,
694
+ callback : Callback < Document >
695
+ ) {
696
+ return this . _getMore ( batchSize , callback ) ;
697
+ } ) ;
698
+
699
+ AbstractCursor . prototype . kInit = promisify ( function (
700
+ this : AbstractCursor ,
701
+ callback : Callback < void >
702
+ ) {
703
+ return this [ kInit ] ( callback ) ;
704
+ } ) ;
705
+
683
706
/**
684
707
* @param cursor - the cursor on which to call `next`
685
708
* @param blocking - a boolean indicating whether or not the cursor should `block` until data
@@ -701,96 +724,57 @@ async function next<T>(
701
724
transform : boolean ;
702
725
}
703
726
) : Promise < T | null > {
704
- const cursorId = cursor [ kId ] ;
705
727
if ( cursor . closed ) {
706
728
return null ;
707
729
}
708
730
709
- if ( cursor [ kDocuments ] . length !== 0 ) {
710
- const doc = cursor [ kDocuments ] . shift ( ) ;
711
-
712
- if ( doc != null && transform && cursor [ kTransform ] ) {
713
- try {
714
- return cursor [ kTransform ] ( doc ) ;
715
- } catch ( error ) {
716
- await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) . catch ( ( ) => {
717
- // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
718
- // error instead.
719
- } ) ;
720
- throw error ;
721
- }
722
- }
723
-
724
- return doc ;
731
+ if ( cursor [ kId ] == null ) {
732
+ await cursor . kInit ( ) ;
725
733
}
726
734
727
- if ( cursorId == null ) {
728
- // All cursors must operate within a session, one must be made implicitly if not explicitly provided
729
- const init = promisify ( cb => cursor [ kInit ] ( cb ) ) ;
730
- await init ( ) ;
731
- return next ( cursor , { blocking, transform } ) ;
732
- }
735
+ do {
736
+ if ( cursor [ kDocuments ] . length !== 0 ) {
737
+ const doc = cursor [ kDocuments ] . shift ( ) ;
733
738
734
- if ( cursorIsDead ( cursor ) ) {
735
- // if the cursor is dead, we clean it up
736
- // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
737
- // and we should surface the error
738
- await cleanupCursorAsync ( cursor , { } ) ;
739
- return null ;
740
- }
739
+ if ( doc != null && transform && cursor [ kTransform ] ) {
740
+ try {
741
+ return cursor [ kTransform ] ( doc ) ;
742
+ } catch ( error ) {
743
+ // `cleanupCursorAsync` should never throw
744
+ // but if it does we want to throw the original error instead.
745
+ await cleanupCursorAsync ( cursor , { error, needsToEmitClosed : true } ) . catch ( ( ) => null ) ;
746
+ throw error ;
747
+ }
748
+ }
741
749
742
- // otherwise need to call getMore
743
- const batchSize = cursor [ kOptions ] . batchSize || 1000 ;
744
- const getMore = promisify ( ( batchSize : number , cb : Callback < Document | undefined > ) =>
745
- cursor . _getMore ( batchSize , cb )
746
- ) ;
747
-
748
- let response : Document | undefined ;
749
- try {
750
- response = await getMore ( batchSize ) ;
751
- } catch ( error ) {
752
- if ( error ) {
753
- await cleanupCursorAsync ( cursor , { error } ) . catch ( ( ) => {
754
- // `cleanupCursorAsync` should never throw, but if it does we want to throw the original
755
- // error instead.
756
- } ) ;
757
- throw error ;
750
+ return doc ;
758
751
}
759
- }
760
-
761
- if ( response ) {
762
- const cursorId =
763
- typeof response . cursor . id === 'number'
764
- ? Long . fromNumber ( response . cursor . id )
765
- : typeof response . cursor . id === 'bigint'
766
- ? Long . fromBigInt ( response . cursor . id )
767
- : response . cursor . id ;
768
752
769
- cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
770
- cursor [ kId ] = cursorId ;
771
- }
772
-
773
- if ( cursorIsDead ( cursor ) ) {
774
- // If we successfully received a response from a cursor BUT the cursor indicates that it is exhausted,
775
- // we intentionally clean up the cursor to release its session back into the pool before the cursor
776
- // is iterated. This prevents a cursor that is exhausted on the server from holding
777
- // onto a session indefinitely until the AbstractCursor is iterated.
778
- //
779
- // cleanupCursorAsync should never throw, but if it does it indicates a bug in the driver
780
- // and we should surface the error
781
- await cleanupCursorAsync ( cursor , { } ) ;
782
- }
783
-
784
- if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
785
- return null ;
786
- }
753
+ if ( cursor . exhausted || cursor [ kId ] == null ) {
754
+ await cleanupCursorAsync ( cursor , { } ) ;
755
+ return null ;
756
+ }
787
757
788
- return next ( cursor , { blocking, transform } ) ;
789
- }
758
+ try {
759
+ const { batchSize = 1000 } = cursor [ kOptions ] ;
760
+ const response = await cursor . _getMoreAsync ( batchSize ) ;
761
+ cursor [ kId ] = getCursorId ( response ) ;
762
+ cursor [ kDocuments ] . pushMany ( response . cursor . nextBatch ) ;
763
+ if ( cursor [ kDocuments ] . length === 0 && blocking === false ) {
764
+ return null ;
765
+ }
766
+ if ( cursor . exhausted ) {
767
+ // Received more documents to return but the cursor is done server side.
768
+ // So release the session as early as possible.
769
+ await cleanupCursorAsync ( cursor , { } ) ;
770
+ }
771
+ } catch ( error ) {
772
+ await cleanupCursorAsync ( cursor , { error } ) . catch ( ( ) => null ) ;
773
+ throw error ;
774
+ }
775
+ } while ( ! cursor . exhausted || cursor [ kDocuments ] . length !== 0 ) ;
790
776
791
- function cursorIsDead ( cursor : AbstractCursor ) : boolean {
792
- const cursorId = cursor [ kId ] ;
793
- return ! ! cursorId && cursorId . isZero ( ) ;
777
+ return null ;
794
778
}
795
779
796
780
const cleanupCursorAsync = promisify ( cleanupCursor ) ;
@@ -955,3 +939,12 @@ class ReadableCursorStream extends Readable {
955
939
) ;
956
940
}
957
941
}
942
+
943
+ function getCursorId ( response : { cursor : { id : number | bigint | Long } } | Document ) : Long {
944
+ // TODO(NODE-2674): Preserve int64 sent from MongoDB
945
+ return typeof response . cursor . id === 'number'
946
+ ? Long . fromNumber ( response . cursor . id )
947
+ : typeof response . cursor . id === 'bigint'
948
+ ? Long . fromBigInt ( response . cursor . id )
949
+ : response . cursor . id ;
950
+ }
0 commit comments