@@ -778,15 +778,6 @@ export class ChangeStream<
778
778
this . _processResumeQueue ( ) ;
779
779
} ;
780
780
781
- // otherwise, raise an error and close the change stream
782
- const unresumableError = ( err : AnyError ) => {
783
- if ( ! callback ) {
784
- this . emit ( ChangeStream . ERROR , err ) ;
785
- }
786
-
787
- this . close ( ( ) => this . _processResumeQueue ( err ) ) ;
788
- } ;
789
-
790
781
if ( cursor && isResumableError ( error , maxWireVersion ( cursor . server ) ) ) {
791
782
this . cursor = undefined ;
792
783
@@ -799,7 +790,7 @@ export class ChangeStream<
799
790
const topology = getTopology ( this . parent ) ;
800
791
this . _waitForTopologyConnected ( topology , { readPreference : cursor . readPreference } , err => {
801
792
// if the topology can't reconnect, close the stream
802
- if ( err ) return unresumableError ( err ) ;
793
+ if ( err ) return this . _closeWithError ( err ) ;
803
794
804
795
// create a new cursor, preserving the old cursor's options
805
796
const newCursor = this . _createChangeStreamCursor ( cursor . resumeOptions ) ;
@@ -810,7 +801,7 @@ export class ChangeStream<
810
801
// attempt to continue in iterator mode
811
802
newCursor . hasNext ( err => {
812
803
// if there's an error immediately after resuming, close the stream
813
- if ( err ) return unresumableError ( err ) ;
804
+ if ( err ) return this . _closeWithError ( err ) ;
814
805
resumeWithCursor ( newCursor ) ;
815
806
} ) ;
816
807
} ) ;
0 commit comments