Skip to content

docs(NODE-6457): Document CSOT change stream behaviour #4301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,59 @@ export class Collection<TSchema extends Document = Document> {
* });
* ```
*
* @remarks
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
* event.
*
* Note that if a change stream is consistently timing out when watching a collection, database or
* client that is being changed, then this may be due to the server timing out before it can finish
* processing the existing oplog. To address this, restart the change stream with a higher
* `timeoutMS`.
*
* If the change stream times out the initial aggregate operation to establish the change stream on
* the server, then the client will close the change stream. If the getMore calls to the server
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
* emitter mode.
*
* To determine whether or not the change stream is still open following a timeout, check the
* {@link ChangeStream.closed} getter.
*
* @example
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
* The next call can just be retried after this succeeds.
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* try {
* await changeStream.next();
* } catch (e) {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* await changeStream.next();
* }
* throw e;
* }
* ```
*
* @example
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
* this will automatically continue emitting change events once the resume attempt completes.
*
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* changeStream.on('change', console.log);
* changeStream.on('error', e => {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* // do nothing
* } else {
* changeStream.close();
* }
* });
* ```
*
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TLocal - Type of the data being detected by the change stream
Expand Down
52 changes: 52 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,58 @@ export class Db {
* - The first is to provide the schema that may be defined for all the collections within this database
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
*
* @remarks
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
* event.
*
* Note that if a change stream is consistently timing out when watching a collection, database or
* client that is being changed, then this may be due to the server timing out before it can finish
* processing the existing oplog. To address this, restart the change stream with a higher
* `timeoutMS`.
*
* If the change stream times out the initial aggregate operation to establish the change stream on
* the server, then the client will close the change stream. If the getMore calls to the server
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
* emitter mode.
*
* To determine whether or not the change stream is still open following a timeout, check the
* {@link ChangeStream.closed} getter.
*
* @example
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
* The next call can just be retried after this succeeds.
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* try {
* await changeStream.next();
* } catch (e) {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* await changeStream.next();
* }
* throw e;
* }
* ```
*
* @example
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
* this will automatically continue emitting change events once the resume attempt completes.
*
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* changeStream.on('change', console.log);
* changeStream.on('error', e => {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* // do nothing
* } else {
* changeStream.close();
* }
* });
* ```
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TSchema - Type of the data being detected by the change stream
Expand Down
52 changes: 52 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,58 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
* - The first is to provide the schema that may be defined for all the data within the current cluster
* - The second is to override the shape of the change stream document entirely, if it is not provided the type will default to ChangeStreamDocument of the first argument
*
* @remarks
* When `timeoutMS` is configured for a change stream, it will have different behaviour depending
* on whether the change stream is in iterator mode or emitter mode. In both cases, a change
* stream will time out if it does not receive a change event within `timeoutMS` of the last change
* event.
*
* Note that if a change stream is consistently timing out when watching a collection, database or
* client that is being changed, then this may be due to the server timing out before it can finish
* processing the existing oplog. To address this, restart the change stream with a higher
* `timeoutMS`.
*
* If the change stream times out the initial aggregate operation to establish the change stream on
* the server, then the client will close the change stream. If the getMore calls to the server
* time out, then the change stream will be left open, but will throw a MongoOperationTimeoutError
* when in iterator mode and emit an error event that returns a MongoOperationTimeoutError in
* emitter mode.
*
* To determine whether or not the change stream is still open following a timeout, check the
* {@link ChangeStream.closed} getter.
*
* @example
* In iterator mode, if a next() call throws a timeout error, it will attempt to resume the change stream.
* The next call can just be retried after this succeeds.
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* try {
* await changeStream.next();
* } catch (e) {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* await changeStream.next();
* }
* throw e;
* }
* ```
*
* @example
* In emitter mode, if the change stream goes `timeoutMS` without emitting a change event, it will
* emit an error event that returns a MongoOperationTimeoutError, but will not close the change
* stream unless the resume attempt fails. There is no need to re-establish change listeners as
* this will automatically continue emitting change events once the resume attempt completes.
*
* ```ts
* const changeStream = collection.watch([], { timeoutMS: 100 });
* changeStream.on('change', console.log);
* changeStream.on('error', e => {
* if (e instanceof MongoOperationTimeoutError && !changeStream.closed) {
* // do nothing
* } else {
* changeStream.close();
* }
* });
* ```
* @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents. This allows for filtering (using $match) and manipulating the change stream documents.
* @param options - Optional settings for the command
* @typeParam TSchema - Type of the data being detected by the change stream
Expand Down