Skip to content

Commit 88c5368

Browse files
authored
docs(NODE-6457): Document CSOT change stream behaviour (#4301)
1 parent 511e6c9 commit 88c5368

File tree

3 files changed

+157
-0
lines changed

3 files changed

+157
-0
lines changed

src/collection.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,59 @@ export class Collection<TSchema extends Document = Document> {
10501050
* });
10511051
* ```
10521052
*
1053+
* @remarks
1054+
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
1055+
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
1056+
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
1057+
* event.
1058+
*
1059+
* Note that if a change stream is consistently timing out when watching a collection, database or
1060+
* client that is being changed, then this may be due to the server timing out before it can finish
1061+
* processing the existing oplog. To address this, restart the change stream with a higher
1062+
* `timeoutMS`.
1063+
*
1064+
* If the change stream times out the initial aggregate operation to establish the change stream on
1065+
* the server, then the client will close the change stream. If the getMore calls to the server
1066+
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
1067+
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
1068+
* emitter mode.
1069+
*
1070+
* To determine whether or not the change stream is still open following a timeout, check the
1071+
* {@link ChangeStream.closed} getter.
1072+
*
1073+
* @example
1074+
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
1075+
* The next call can just be retried after this succeeds.
1076+
* ```ts
1077+
* const changeStream = collection.watch([], { timeoutMS: 100 });
1078+
* try {
1079+
* await changeStream.next();
1080+
* } catch (e) {
1081+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
1082+
* await changeStream.next();
1083+
* }
1084+
* throw e;
1085+
* }
1086+
* ```
1087+
*
1088+
* @example
1089+
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
1090+
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
1091+
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
1092+
* this will automatically continue emitting change events once the resume attempt completes.
1093+
*
1094+
* ```ts
1095+
* const changeStream = collection.watch([], { timeoutMS: 100 });
1096+
* changeStream.on('change', console.log);
1097+
* changeStream.on('error', e => {
1098+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
1099+
* // do nothing
1100+
* } else {
1101+
* changeStream.close();
1102+
* }
1103+
* });
1104+
* ```
1105+
*
10531106
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
10541107
* @param options - Optional settings for the command
10551108
* @typeParam TLocal - Type of the data being detected by the change stream

src/db.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,58 @@ export class Db {
531531
* - The first is to provide the schema that may be defined for all the collections within this database
532532
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
533533
*
534+
* @remarks
535+
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
536+
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
537+
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
538+
* event.
539+
*
540+
* Note that if a change stream is consistently timing out when watching a collection, database or
541+
* client that is being changed, then this may be due to the server timing out before it can finish
542+
* processing the existing oplog. To address this, restart the change stream with a higher
543+
* `timeoutMS`.
544+
*
545+
* If the change stream times out the initial aggregate operation to establish the change stream on
546+
* the server, then the client will close the change stream. If the getMore calls to the server
547+
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
548+
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
549+
* emitter mode.
550+
*
551+
* To determine whether or not the change stream is still open following a timeout, check the
552+
* {@link ChangeStream.closed} getter.
553+
*
554+
* @example
555+
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
556+
* The next call can just be retried after this succeeds.
557+
* ```ts
558+
* const changeStream = collection.watch([], { timeoutMS: 100 });
559+
* try {
560+
* await changeStream.next();
561+
* } catch (e) {
562+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
563+
* await changeStream.next();
564+
* }
565+
* throw e;
566+
* }
567+
* ```
568+
*
569+
* @example
570+
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
571+
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
572+
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
573+
* this will automatically continue emitting change events once the resume attempt completes.
574+
*
575+
* ```ts
576+
* const changeStream = collection.watch([], { timeoutMS: 100 });
577+
* changeStream.on('change', console.log);
578+
* changeStream.on('error', e => {
579+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
580+
* // do nothing
581+
* } else {
582+
* changeStream.close();
583+
* }
584+
* });
585+
* ```
534586
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
535587
* @param options - Optional settings for the command
536588
* @typeParam TSchema - Type of the data being detected by the change stream

src/mongo_client.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,58 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
814814
* - The first is to provide the schema that may be defined for all the data within the current cluster
815815
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
816816
*
817+
* @remarks
818+
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
819+
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
820+
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
821+
* event.
822+
*
823+
* Note that if a change stream is consistently timing out when watching a collection, database or
824+
* client that is being changed, then this may be due to the server timing out before it can finish
825+
* processing the existing oplog. To address this, restart the change stream with a higher
826+
* `timeoutMS`.
827+
*
828+
* If the change stream times out the initial aggregate operation to establish the change stream on
829+
* the server, then the client will close the change stream. If the getMore calls to the server
830+
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
831+
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
832+
* emitter mode.
833+
*
834+
* To determine whether or not the change stream is still open following a timeout, check the
835+
* {@link ChangeStream.closed} getter.
836+
*
837+
* @example
838+
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
839+
* The next call can just be retried after this succeeds.
840+
* ```ts
841+
* const changeStream = collection.watch([], { timeoutMS: 100 });
842+
* try {
843+
* await changeStream.next();
844+
* } catch (e) {
845+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
846+
* await changeStream.next();
847+
* }
848+
* throw e;
849+
* }
850+
* ```
851+
*
852+
* @example
853+
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
854+
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
855+
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
856+
* this will automatically continue emitting change events once the resume attempt completes.
857+
*
858+
* ```ts
859+
* const changeStream = collection.watch([], { timeoutMS: 100 });
860+
* changeStream.on('change', console.log);
861+
* changeStream.on('error', e => {
862+
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
863+
* // do nothing
864+
* } else {
865+
* changeStream.close();
866+
* }
867+
* });
868+
* ```
817869
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
818870
* @param options - Optional settings for the command
819871
* @typeParam TSchema - Type of the data being detected by the change stream

0 commit comments

Comments
 (0)