@@ -3,14 +3,15 @@ import type { Readable } from 'stream';
3
3
import type { Binary , Document , Timestamp } from './bson' ;
4
4
import { Collection } from './collection' ;
5
5
import { CHANGE , CLOSE , END , ERROR , INIT , MORE , RESPONSE , RESUME_TOKEN_CHANGED } from './constants' ;
6
- import type { AbstractCursorEvents , CursorStreamOptions } from './cursor/abstract_cursor' ;
6
+ import { type CursorStreamOptions , CursorTimeoutContext } from './cursor/abstract_cursor' ;
7
7
import { ChangeStreamCursor , type ChangeStreamCursorOptions } from './cursor/change_stream_cursor' ;
8
8
import { Db } from './db' ;
9
9
import {
10
10
type AnyError ,
11
11
isResumableError ,
12
12
MongoAPIError ,
13
13
MongoChangeStreamError ,
14
+ MongoOperationTimeoutError ,
14
15
MongoRuntimeError
15
16
} from './error' ;
16
17
import { MongoClient } from './mongo_client' ;
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
20
21
import type { ReadPreference } from './read_preference' ;
21
22
import { type AsyncDisposable , configureResourceManagement } from './resource_management' ;
22
23
import type { ServerSessionId } from './sessions' ;
24
+ import { CSOTTimeoutContext , type TimeoutContext } from './timeout' ;
23
25
import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
24
26
25
27
/** @internal */
@@ -538,7 +540,13 @@ export type ChangeStreamEvents<
538
540
end ( ) : void ;
539
541
error ( error : Error ) : void ;
540
542
change ( change : TChange ) : void ;
541
- } & AbstractCursorEvents ;
543
+ /**
544
+ * @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
545
+ * instance is closed, which can occur multiple times for a given `ChangeStream` instance.
546
+ * When this event is emitted is subject to change outside of major versions.
547
+ */
548
+ close ( ) : void ;
549
+ } ;
542
550
543
551
/**
544
552
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
@@ -609,6 +617,13 @@ export class ChangeStream<
609
617
*/
610
618
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED ;
611
619
620
+ private timeoutContext ?: TimeoutContext ;
621
+ /**
622
+ * Note that this property is here to uniquely identify a ChangeStream instance as the owner of
623
+ * the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
624
+ * that {@link AbstractCursor.close} does not mutate the timeoutContext.
625
+ */
626
+ private contextOwner : symbol ;
612
627
/**
613
628
* @internal
614
629
*
@@ -624,20 +639,25 @@ export class ChangeStream<
624
639
625
640
this . pipeline = pipeline ;
626
641
this . options = { ...options } ;
642
+ let serverSelectionTimeoutMS : number ;
627
643
delete this . options . writeConcern ;
628
644
629
645
if ( parent instanceof Collection ) {
630
646
this . type = CHANGE_DOMAIN_TYPES . COLLECTION ;
647
+ serverSelectionTimeoutMS = parent . s . db . client . options . serverSelectionTimeoutMS ;
631
648
} else if ( parent instanceof Db ) {
632
649
this . type = CHANGE_DOMAIN_TYPES . DATABASE ;
650
+ serverSelectionTimeoutMS = parent . client . options . serverSelectionTimeoutMS ;
633
651
} else if ( parent instanceof MongoClient ) {
634
652
this . type = CHANGE_DOMAIN_TYPES . CLUSTER ;
653
+ serverSelectionTimeoutMS = parent . options . serverSelectionTimeoutMS ;
635
654
} else {
636
655
throw new MongoChangeStreamError (
637
656
'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
638
657
) ;
639
658
}
640
659
660
+ this . contextOwner = Symbol ( ) ;
641
661
this . parent = parent ;
642
662
this . namespace = parent . s . namespace ;
643
663
if ( ! this . options . readPreference && parent . readPreference ) {
@@ -662,6 +682,13 @@ export class ChangeStream<
662
682
this [ kCursorStream ] ?. removeAllListeners ( 'data' ) ;
663
683
}
664
684
} ) ;
685
+
686
+ if ( this . options . timeoutMS != null ) {
687
+ this . timeoutContext = new CSOTTimeoutContext ( {
688
+ timeoutMS : this . options . timeoutMS ,
689
+ serverSelectionTimeoutMS
690
+ } ) ;
691
+ }
665
692
}
666
693
667
694
/** @internal */
@@ -681,22 +708,31 @@ export class ChangeStream<
681
708
// This loop continues until either a change event is received or until a resume attempt
682
709
// fails.
683
710
684
- while ( true ) {
685
- try {
686
- const hasNext = await this . cursor . hasNext ( ) ;
687
- return hasNext ;
688
- } catch ( error ) {
711
+ this . timeoutContext ?. refresh ( ) ;
712
+ try {
713
+ while ( true ) {
714
+ const cursorInitialized = this . cursor . id != null ;
689
715
try {
690
- await this . _processErrorIteratorMode ( error ) ;
716
+ const hasNext = await this . cursor . hasNext ( ) ;
717
+ return hasNext ;
691
718
} catch ( error ) {
692
719
try {
693
- await this . close ( ) ;
720
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
694
721
} catch ( error ) {
695
- squashError ( error ) ;
722
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
723
+ throw error ;
724
+ }
725
+ try {
726
+ await this . close ( ) ;
727
+ } catch ( error ) {
728
+ squashError ( error ) ;
729
+ }
730
+ throw error ;
696
731
}
697
- throw error ;
698
732
}
699
733
}
734
+ } finally {
735
+ this . timeoutContext ?. clear ( ) ;
700
736
}
701
737
}
702
738
@@ -706,24 +742,33 @@ export class ChangeStream<
706
742
// Change streams must resume indefinitely while each resume event succeeds.
707
743
// This loop continues until either a change event is received or until a resume attempt
708
744
// fails.
745
+ this . timeoutContext ?. refresh ( ) ;
709
746
710
- while ( true ) {
711
- try {
712
- const change = await this . cursor . next ( ) ;
713
- const processedChange = this . _processChange ( change ?? null ) ;
714
- return processedChange ;
715
- } catch ( error ) {
747
+ try {
748
+ while ( true ) {
749
+ const cursorInitialized = this . cursor . id != null ;
716
750
try {
717
- await this . _processErrorIteratorMode ( error ) ;
751
+ const change = await this . cursor . next ( ) ;
752
+ const processedChange = this . _processChange ( change ?? null ) ;
753
+ return processedChange ;
718
754
} catch ( error ) {
719
755
try {
720
- await this . close ( ) ;
756
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
721
757
} catch ( error ) {
722
- squashError ( error ) ;
758
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
759
+ throw error ;
760
+ }
761
+ try {
762
+ await this . close ( ) ;
763
+ } catch ( error ) {
764
+ squashError ( error ) ;
765
+ }
766
+ throw error ;
723
767
}
724
- throw error ;
725
768
}
726
769
}
770
+ } finally {
771
+ this . timeoutContext ?. clear ( ) ;
727
772
}
728
773
}
729
774
@@ -735,23 +780,30 @@ export class ChangeStream<
735
780
// Change streams must resume indefinitely while each resume event succeeds.
736
781
// This loop continues until either a change event is received or until a resume attempt
737
782
// fails.
783
+ this . timeoutContext ?. refresh ( ) ;
738
784
739
- while ( true ) {
740
- try {
741
- const change = await this . cursor . tryNext ( ) ;
742
- return change ?? null ;
743
- } catch ( error ) {
785
+ try {
786
+ while ( true ) {
787
+ const cursorInitialized = this . cursor . id != null ;
744
788
try {
745
- await this . _processErrorIteratorMode ( error ) ;
789
+ const change = await this . cursor . tryNext ( ) ;
790
+ return change ?? null ;
746
791
} catch ( error ) {
747
792
try {
748
- await this . close ( ) ;
793
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
749
794
} catch ( error ) {
750
- squashError ( error ) ;
795
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) throw error ;
796
+ try {
797
+ await this . close ( ) ;
798
+ } catch ( error ) {
799
+ squashError ( error ) ;
800
+ }
801
+ throw error ;
751
802
}
752
- throw error ;
753
803
}
754
804
}
805
+ } finally {
806
+ this . timeoutContext ?. clear ( ) ;
755
807
}
756
808
}
757
809
@@ -784,6 +836,8 @@ export class ChangeStream<
784
836
* Frees the internal resources used by the change stream.
785
837
*/
786
838
async close ( ) : Promise < void > {
839
+ this . timeoutContext ?. clear ( ) ;
840
+ this . timeoutContext = undefined ;
787
841
this [ kClosed ] = true ;
788
842
789
843
const cursor = this . cursor ;
@@ -866,7 +920,12 @@ export class ChangeStream<
866
920
client ,
867
921
this . namespace ,
868
922
pipeline ,
869
- options
923
+ {
924
+ ...options ,
925
+ timeoutContext : this . timeoutContext
926
+ ? new CursorTimeoutContext ( this . timeoutContext , this . contextOwner )
927
+ : undefined
928
+ }
870
929
) ;
871
930
872
931
for ( const event of CHANGE_STREAM_EVENTS ) {
@@ -900,7 +959,7 @@ export class ChangeStream<
900
959
this . emit ( ChangeStream . ERROR , error ) ;
901
960
}
902
961
} ) ;
903
- stream . on ( 'error' , error => this . _processErrorStreamMode ( error ) ) ;
962
+ stream . on ( 'error' , error => this . _processErrorStreamMode ( error , this . cursor . id != null ) ) ;
904
963
}
905
964
906
965
/** @internal */
@@ -942,24 +1001,30 @@ export class ChangeStream<
942
1001
}
943
1002
944
1003
/** @internal */
945
- private _processErrorStreamMode ( changeStreamError : AnyError ) {
1004
+ private _processErrorStreamMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
946
1005
// If the change stream has been closed explicitly, do not process error.
947
1006
if ( this [ kClosed ] ) return ;
948
1007
949
- if ( this . cursor . id != null && isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
1008
+ if (
1009
+ cursorInitialized &&
1010
+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
1011
+ changeStreamError instanceof MongoOperationTimeoutError )
1012
+ ) {
950
1013
this . _endStream ( ) ;
951
1014
952
- this . cursor . close ( ) . then ( undefined , squashError ) ;
953
-
954
- const topology = getTopology ( this . parent ) ;
955
- topology
956
- . selectServer ( this . cursor . readPreference , {
957
- operationName : 'reconnect topology in change stream'
958
- } )
959
-
1015
+ this . cursor
1016
+ . close ( )
1017
+ . then (
1018
+ ( ) => this . _resume ( changeStreamError ) ,
1019
+ e => {
1020
+ squashError ( e ) ;
1021
+ return this . _resume ( changeStreamError ) ;
1022
+ }
1023
+ )
960
1024
. then (
961
1025
( ) => {
962
- this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1026
+ if ( changeStreamError instanceof MongoOperationTimeoutError )
1027
+ this . emit ( ChangeStream . ERROR , changeStreamError ) ;
963
1028
} ,
964
1029
( ) => this . _closeEmitterModeWithError ( changeStreamError )
965
1030
) ;
@@ -969,33 +1034,44 @@ export class ChangeStream<
969
1034
}
970
1035
971
1036
/** @internal */
972
- private async _processErrorIteratorMode ( changeStreamError : AnyError ) {
1037
+ private async _processErrorIteratorMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
973
1038
if ( this [ kClosed ] ) {
974
1039
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
975
1040
throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
976
1041
}
977
1042
978
1043
if (
979
- this . cursor . id == null ||
980
- ! isResumableError ( changeStreamError , this . cursor . maxWireVersion )
1044
+ cursorInitialized &&
1045
+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
1046
+ changeStreamError instanceof MongoOperationTimeoutError )
981
1047
) {
1048
+ try {
1049
+ await this . cursor . close ( ) ;
1050
+ } catch ( error ) {
1051
+ squashError ( error ) ;
1052
+ }
1053
+
1054
+ await this . _resume ( changeStreamError ) ;
1055
+
1056
+ if ( changeStreamError instanceof MongoOperationTimeoutError ) throw changeStreamError ;
1057
+ } else {
982
1058
try {
983
1059
await this . close ( ) ;
984
1060
} catch ( error ) {
985
1061
squashError ( error ) ;
986
1062
}
1063
+
987
1064
throw changeStreamError ;
988
1065
}
1066
+ }
989
1067
990
- try {
991
- await this . cursor . close ( ) ;
992
- } catch ( error ) {
993
- squashError ( error ) ;
994
- }
1068
+ private async _resume ( changeStreamError : AnyError ) {
1069
+ this . timeoutContext ?. refresh ( ) ;
995
1070
const topology = getTopology ( this . parent ) ;
996
1071
try {
997
1072
await topology . selectServer ( this . cursor . readPreference , {
998
- operationName : 'reconnect topology in change stream'
1073
+ operationName : 'reconnect topology in change stream' ,
1074
+ timeoutContext : this . timeoutContext
999
1075
} ) ;
1000
1076
this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1001
1077
} catch {
0 commit comments