@@ -3,14 +3,19 @@ import type { Readable } from 'stream';
3
3
import type { Binary , Document , Timestamp } from './bson' ;
4
4
import { Collection } from './collection' ;
5
5
import { CHANGE , CLOSE , END , ERROR , INIT , MORE , RESPONSE , RESUME_TOKEN_CHANGED } from './constants' ;
6
- import type { AbstractCursorEvents , CursorStreamOptions } from './cursor/abstract_cursor' ;
6
+ import {
7
+ type AbstractCursorEvents ,
8
+ type CursorStreamOptions ,
9
+ CursorTimeoutContext
10
+ } from './cursor/abstract_cursor' ;
7
11
import { ChangeStreamCursor , type ChangeStreamCursorOptions } from './cursor/change_stream_cursor' ;
8
12
import { Db } from './db' ;
9
13
import {
10
14
type AnyError ,
11
15
isResumableError ,
12
16
MongoAPIError ,
13
17
MongoChangeStreamError ,
18
+ MongoOperationTimeoutError ,
14
19
MongoRuntimeError
15
20
} from './error' ;
16
21
import { MongoClient } from './mongo_client' ;
@@ -20,6 +25,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
20
25
import type { ReadPreference } from './read_preference' ;
21
26
import { type AsyncDisposable , configureResourceManagement } from './resource_management' ;
22
27
import type { ServerSessionId } from './sessions' ;
28
+ import { type TimeoutContext } from './timeout' ;
23
29
import { filterOptions , getTopology , type MongoDBNamespace , squashError } from './utils' ;
24
30
25
31
/** @internal */
@@ -609,6 +615,8 @@ export class ChangeStream<
609
615
*/
610
616
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED ;
611
617
618
+ private timeoutContext ?: TimeoutContext ;
619
+ private symbol : symbol ;
612
620
/**
613
621
* @internal
614
622
*
@@ -638,6 +646,7 @@ export class ChangeStream<
638
646
) ;
639
647
}
640
648
649
+ this . symbol = Symbol ( ) ;
641
650
this . parent = parent ;
642
651
this . namespace = parent . s . namespace ;
643
652
if ( ! this . options . readPreference && parent . readPreference ) {
@@ -681,22 +690,31 @@ export class ChangeStream<
681
690
// This loop continues until either a change event is received or until a resume attempt
682
691
// fails.
683
692
684
- while ( true ) {
685
- try {
686
- const hasNext = await this . cursor . hasNext ( ) ;
687
- return hasNext ;
688
- } catch ( error ) {
693
+ this . timeoutContext ?. refresh ( ) ;
694
+ try {
695
+ while ( true ) {
696
+ const cursorInitialized = this . cursor . id != null ;
689
697
try {
690
- await this . _processErrorIteratorMode ( error ) ;
698
+ const hasNext = await this . cursor . hasNext ( ) ;
699
+ return hasNext ;
691
700
} catch ( error ) {
692
701
try {
693
- await this . close ( ) ;
702
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
694
703
} catch ( error ) {
695
- squashError ( error ) ;
704
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
705
+ throw error ;
706
+ }
707
+ try {
708
+ await this . close ( ) ;
709
+ } catch ( error ) {
710
+ squashError ( error ) ;
711
+ }
712
+ throw error ;
696
713
}
697
- throw error ;
698
714
}
699
715
}
716
+ } finally {
717
+ this . timeoutContext ?. clear ( ) ;
700
718
}
701
719
}
702
720
@@ -706,24 +724,33 @@ export class ChangeStream<
706
724
// Change streams must resume indefinitely while each resume event succeeds.
707
725
// This loop continues until either a change event is received or until a resume attempt
708
726
// fails.
727
+ this . timeoutContext ?. refresh ( ) ;
709
728
710
- while ( true ) {
711
- try {
712
- const change = await this . cursor . next ( ) ;
713
- const processedChange = this . _processChange ( change ?? null ) ;
714
- return processedChange ;
715
- } catch ( error ) {
729
+ try {
730
+ while ( true ) {
731
+ const cursorInitialized = this . cursor . id != null ;
716
732
try {
717
- await this . _processErrorIteratorMode ( error ) ;
733
+ const change = await this . cursor . next ( ) ;
734
+ const processedChange = this . _processChange ( change ?? null ) ;
735
+ return processedChange ;
718
736
} catch ( error ) {
719
737
try {
720
- await this . close ( ) ;
738
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
721
739
} catch ( error ) {
722
- squashError ( error ) ;
740
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) {
741
+ throw error ;
742
+ }
743
+ try {
744
+ await this . close ( ) ;
745
+ } catch ( error ) {
746
+ squashError ( error ) ;
747
+ }
748
+ throw error ;
723
749
}
724
- throw error ;
725
750
}
726
751
}
752
+ } finally {
753
+ this . timeoutContext ?. clear ( ) ;
727
754
}
728
755
}
729
756
@@ -735,23 +762,30 @@ export class ChangeStream<
735
762
// Change streams must resume indefinitely while each resume event succeeds.
736
763
// This loop continues until either a change event is received or until a resume attempt
737
764
// fails.
765
+ this . timeoutContext ?. refresh ( ) ;
738
766
739
- while ( true ) {
740
- try {
741
- const change = await this . cursor . tryNext ( ) ;
742
- return change ?? null ;
743
- } catch ( error ) {
767
+ try {
768
+ while ( true ) {
769
+ const cursorInitialized = this . cursor . id != null ;
744
770
try {
745
- await this . _processErrorIteratorMode ( error ) ;
771
+ const change = await this . cursor . tryNext ( ) ;
772
+ return change ?? null ;
746
773
} catch ( error ) {
747
774
try {
748
- await this . close ( ) ;
775
+ await this . _processErrorIteratorMode ( error , cursorInitialized ) ;
749
776
} catch ( error ) {
750
- squashError ( error ) ;
777
+ if ( error instanceof MongoOperationTimeoutError && cursorInitialized ) throw error ;
778
+ try {
779
+ await this . close ( ) ;
780
+ } catch ( error ) {
781
+ squashError ( error ) ;
782
+ }
783
+ throw error ;
751
784
}
752
- throw error ;
753
785
}
754
786
}
787
+ } finally {
788
+ this . timeoutContext ?. clear ( ) ;
755
789
}
756
790
}
757
791
@@ -784,6 +818,8 @@ export class ChangeStream<
784
818
* Frees the internal resources used by the change stream.
785
819
*/
786
820
async close ( ) : Promise < void > {
821
+ this . timeoutContext ?. clear ( ) ;
822
+ this . timeoutContext = undefined ;
787
823
this [ kClosed ] = true ;
788
824
789
825
const cursor = this . cursor ;
@@ -866,7 +902,12 @@ export class ChangeStream<
866
902
client ,
867
903
this . namespace ,
868
904
pipeline ,
869
- options
905
+ {
906
+ ...options ,
907
+ timeoutContext : this . timeoutContext
908
+ ? new CursorTimeoutContext ( this . timeoutContext , this . symbol )
909
+ : undefined
910
+ }
870
911
) ;
871
912
872
913
for ( const event of CHANGE_STREAM_EVENTS ) {
@@ -893,14 +934,17 @@ export class ChangeStream<
893
934
const stream = this [ kCursorStream ] ?? cursor . stream ( ) ;
894
935
this [ kCursorStream ] = stream ;
895
936
stream . on ( 'data' , change => {
937
+ this . timeoutContext ?. refresh ( ) ;
896
938
try {
897
939
const processedChange = this . _processChange ( change ) ;
898
940
this . emit ( ChangeStream . CHANGE , processedChange ) ;
899
941
} catch ( error ) {
900
942
this . emit ( ChangeStream . ERROR , error ) ;
943
+ } finally {
944
+ this . timeoutContext ?. clear ( ) ;
901
945
}
902
946
} ) ;
903
- stream . on ( 'error' , error => this . _processErrorStreamMode ( error ) ) ;
947
+ stream . on ( 'error' , error => this . _processErrorStreamMode ( error , this . cursor . id != null ) ) ;
904
948
}
905
949
906
950
/** @internal */
@@ -942,24 +986,30 @@ export class ChangeStream<
942
986
}
943
987
944
988
/** @internal */
945
- private _processErrorStreamMode ( changeStreamError : AnyError ) {
989
+ private _processErrorStreamMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
946
990
// If the change stream has been closed explicitly, do not process error.
947
991
if ( this [ kClosed ] ) return ;
948
992
949
- if ( this . cursor . id != null && isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ) {
993
+ if (
994
+ cursorInitialized &&
995
+ ( isResumableError ( changeStreamError , this . cursor . maxWireVersion ) ||
996
+ changeStreamError instanceof MongoOperationTimeoutError )
997
+ ) {
950
998
this . _endStream ( ) ;
951
999
952
- this . cursor . close ( ) . then ( undefined , squashError ) ;
953
-
954
- const topology = getTopology ( this . parent ) ;
955
- topology
956
- . selectServer ( this . cursor . readPreference , {
957
- operationName : 'reconnect topology in change stream'
958
- } )
959
-
1000
+ this . cursor
1001
+ . close ( )
1002
+ . then (
1003
+ ( ) => this . _resume ( changeStreamError ) ,
1004
+ e => {
1005
+ squashError ( e ) ;
1006
+ return this . _resume ( changeStreamError ) ;
1007
+ }
1008
+ )
960
1009
. then (
961
1010
( ) => {
962
- this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1011
+ if ( changeStreamError instanceof MongoOperationTimeoutError )
1012
+ this . emit ( ChangeStream . ERROR , changeStreamError ) ;
963
1013
} ,
964
1014
( ) => this . _closeEmitterModeWithError ( changeStreamError )
965
1015
) ;
@@ -969,15 +1019,16 @@ export class ChangeStream<
969
1019
}
970
1020
971
1021
/** @internal */
972
- private async _processErrorIteratorMode ( changeStreamError : AnyError ) {
1022
+ private async _processErrorIteratorMode ( changeStreamError : AnyError , cursorInitialized : boolean ) {
973
1023
if ( this [ kClosed ] ) {
974
1024
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
975
1025
throw new MongoAPIError ( CHANGESTREAM_CLOSED_ERROR ) ;
976
1026
}
977
1027
978
1028
if (
979
- this . cursor . id == null ||
980
- ! isResumableError ( changeStreamError , this . cursor . maxWireVersion )
1029
+ ! cursorInitialized ||
1030
+ ( ! isResumableError ( changeStreamError , this . cursor . maxWireVersion ) &&
1031
+ ! ( changeStreamError instanceof MongoOperationTimeoutError ) )
981
1032
) {
982
1033
try {
983
1034
await this . close ( ) ;
@@ -992,10 +1043,19 @@ export class ChangeStream<
992
1043
} catch ( error ) {
993
1044
squashError ( error ) ;
994
1045
}
1046
+
1047
+ await this . _resume ( changeStreamError ) ;
1048
+
1049
+ if ( changeStreamError instanceof MongoOperationTimeoutError ) throw changeStreamError ;
1050
+ }
1051
+
1052
+ private async _resume ( changeStreamError : AnyError ) {
1053
+ this . timeoutContext ?. refresh ( ) ;
995
1054
const topology = getTopology ( this . parent ) ;
996
1055
try {
997
1056
await topology . selectServer ( this . cursor . readPreference , {
998
- operationName : 'reconnect topology in change stream'
1057
+ operationName : 'reconnect topology in change stream' ,
1058
+ timeoutContext : this . timeoutContext
999
1059
} ) ;
1000
1060
this . cursor = this . _createChangeStreamCursor ( this . cursor . resumeOptions ) ;
1001
1061
} catch {
0 commit comments