Skip to content

Commit fee5ac4

Browse files
committed
add CSOT support to change streams
1 parent 4fd4b24 commit fee5ac4

File tree

8 files changed

+381
-108
lines changed

8 files changed

+381
-108
lines changed

src/change_stream.ts

Lines changed: 128 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import type { Readable } from 'stream';
33
import type { Binary, Document, Timestamp } from './bson';
44
import { Collection } from './collection';
55
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
6-
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
6+
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
77
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
88
import { Db } from './db';
99
import {
1010
type AnyError,
1111
isResumableError,
1212
MongoAPIError,
1313
MongoChangeStreamError,
14+
MongoOperationTimeoutError,
1415
MongoRuntimeError
1516
} from './error';
1617
import { MongoClient } from './mongo_client';
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2021
import type { ReadPreference } from './read_preference';
2122
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
2223
import type { ServerSessionId } from './sessions';
24+
import { CSOTTimeoutContext, type TimeoutContext } from './timeout';
2325
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
2426

2527
/** @internal */
@@ -538,7 +540,13 @@ export type ChangeStreamEvents<
538540
end(): void;
539541
error(error: Error): void;
540542
change(change: TChange): void;
541-
} & AbstractCursorEvents;
543+
/**
544+
* @remarks Note that the `close` event is currently emitted whenever the internal `ChangeStreamCursor`
545+
* instance is closed, which can occur multiple times for a given `ChangeStream` instance.
546+
* When this event is emitted is subject to change outside of major versions.
547+
*/
548+
close(): void;
549+
};
542550

543551
/**
544552
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
@@ -609,6 +617,13 @@ export class ChangeStream<
609617
*/
610618
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED;
611619

620+
private timeoutContext?: TimeoutContext;
621+
/**
622+
* Note that this property is here to uniquely identify a ChangeStream instance as the owner of
623+
* the {@link CursorTimeoutContext} instance (see {@link ChangeStream._createChangeStreamCursor}) to ensure
624+
* that {@link AbstractCursor.close} does not mutate the timeoutContext.
625+
*/
626+
private contextOwner: symbol;
612627
/**
613628
* @internal
614629
*
@@ -624,20 +639,25 @@ export class ChangeStream<
624639

625640
this.pipeline = pipeline;
626641
this.options = { ...options };
642+
let serverSelectionTimeoutMS: number;
627643
delete this.options.writeConcern;
628644

629645
if (parent instanceof Collection) {
630646
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
647+
serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
631648
} else if (parent instanceof Db) {
632649
this.type = CHANGE_DOMAIN_TYPES.DATABASE;
650+
serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
633651
} else if (parent instanceof MongoClient) {
634652
this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
653+
serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
635654
} else {
636655
throw new MongoChangeStreamError(
637656
'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient'
638657
);
639658
}
640659

660+
this.contextOwner = Symbol();
641661
this.parent = parent;
642662
this.namespace = parent.s.namespace;
643663
if (!this.options.readPreference && parent.readPreference) {
@@ -662,6 +682,13 @@ export class ChangeStream<
662682
this[kCursorStream]?.removeAllListeners('data');
663683
}
664684
});
685+
686+
if (this.options.timeoutMS != null) {
687+
this.timeoutContext = new CSOTTimeoutContext({
688+
timeoutMS: this.options.timeoutMS,
689+
serverSelectionTimeoutMS
690+
});
691+
}
665692
}
666693

667694
/** @internal */
@@ -681,22 +708,31 @@ export class ChangeStream<
681708
// This loop continues until either a change event is received or until a resume attempt
682709
// fails.
683710

684-
while (true) {
685-
try {
686-
const hasNext = await this.cursor.hasNext();
687-
return hasNext;
688-
} catch (error) {
711+
this.timeoutContext?.refresh();
712+
try {
713+
while (true) {
714+
const cursorInitialized = this.cursor.id != null;
689715
try {
690-
await this._processErrorIteratorMode(error);
716+
const hasNext = await this.cursor.hasNext();
717+
return hasNext;
691718
} catch (error) {
692719
try {
693-
await this.close();
720+
await this._processErrorIteratorMode(error, cursorInitialized);
694721
} catch (error) {
695-
squashError(error);
722+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) {
723+
throw error;
724+
}
725+
try {
726+
await this.close();
727+
} catch (error) {
728+
squashError(error);
729+
}
730+
throw error;
696731
}
697-
throw error;
698732
}
699733
}
734+
} finally {
735+
this.timeoutContext?.clear();
700736
}
701737
}
702738

@@ -706,24 +742,33 @@ export class ChangeStream<
706742
// Change streams must resume indefinitely while each resume event succeeds.
707743
// This loop continues until either a change event is received or until a resume attempt
708744
// fails.
745+
this.timeoutContext?.refresh();
709746

710-
while (true) {
711-
try {
712-
const change = await this.cursor.next();
713-
const processedChange = this._processChange(change ?? null);
714-
return processedChange;
715-
} catch (error) {
747+
try {
748+
while (true) {
749+
const cursorInitialized = this.cursor.id != null;
716750
try {
717-
await this._processErrorIteratorMode(error);
751+
const change = await this.cursor.next();
752+
const processedChange = this._processChange(change ?? null);
753+
return processedChange;
718754
} catch (error) {
719755
try {
720-
await this.close();
756+
await this._processErrorIteratorMode(error, cursorInitialized);
721757
} catch (error) {
722-
squashError(error);
758+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) {
759+
throw error;
760+
}
761+
try {
762+
await this.close();
763+
} catch (error) {
764+
squashError(error);
765+
}
766+
throw error;
723767
}
724-
throw error;
725768
}
726769
}
770+
} finally {
771+
this.timeoutContext?.clear();
727772
}
728773
}
729774

@@ -735,23 +780,30 @@ export class ChangeStream<
735780
// Change streams must resume indefinitely while each resume event succeeds.
736781
// This loop continues until either a change event is received or until a resume attempt
737782
// fails.
783+
this.timeoutContext?.refresh();
738784

739-
while (true) {
740-
try {
741-
const change = await this.cursor.tryNext();
742-
return change ?? null;
743-
} catch (error) {
785+
try {
786+
while (true) {
787+
const cursorInitialized = this.cursor.id != null;
744788
try {
745-
await this._processErrorIteratorMode(error);
789+
const change = await this.cursor.tryNext();
790+
return change ?? null;
746791
} catch (error) {
747792
try {
748-
await this.close();
793+
await this._processErrorIteratorMode(error, cursorInitialized);
749794
} catch (error) {
750-
squashError(error);
795+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) throw error;
796+
try {
797+
await this.close();
798+
} catch (error) {
799+
squashError(error);
800+
}
801+
throw error;
751802
}
752-
throw error;
753803
}
754804
}
805+
} finally {
806+
this.timeoutContext?.clear();
755807
}
756808
}
757809

@@ -784,6 +836,8 @@ export class ChangeStream<
784836
* Frees the internal resources used by the change stream.
785837
*/
786838
async close(): Promise<void> {
839+
this.timeoutContext?.clear();
840+
this.timeoutContext = undefined;
787841
this[kClosed] = true;
788842

789843
const cursor = this.cursor;
@@ -866,7 +920,12 @@ export class ChangeStream<
866920
client,
867921
this.namespace,
868922
pipeline,
869-
options
923+
{
924+
...options,
925+
timeoutContext: this.timeoutContext
926+
? new CursorTimeoutContext(this.timeoutContext, this.contextOwner)
927+
: undefined
928+
}
870929
);
871930

872931
for (const event of CHANGE_STREAM_EVENTS) {
@@ -900,7 +959,7 @@ export class ChangeStream<
900959
this.emit(ChangeStream.ERROR, error);
901960
}
902961
});
903-
stream.on('error', error => this._processErrorStreamMode(error));
962+
stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
904963
}
905964

906965
/** @internal */
@@ -942,24 +1001,30 @@ export class ChangeStream<
9421001
}
9431002

9441003
/** @internal */
945-
private _processErrorStreamMode(changeStreamError: AnyError) {
1004+
private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) {
9461005
// If the change stream has been closed explicitly, do not process error.
9471006
if (this[kClosed]) return;
9481007

949-
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
1008+
if (
1009+
cursorInitialized &&
1010+
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
1011+
changeStreamError instanceof MongoOperationTimeoutError)
1012+
) {
9501013
this._endStream();
9511014

952-
this.cursor.close().then(undefined, squashError);
953-
954-
const topology = getTopology(this.parent);
955-
topology
956-
.selectServer(this.cursor.readPreference, {
957-
operationName: 'reconnect topology in change stream'
958-
})
959-
1015+
this.cursor
1016+
.close()
1017+
.then(
1018+
() => this._resume(changeStreamError),
1019+
e => {
1020+
squashError(e);
1021+
return this._resume(changeStreamError);
1022+
}
1023+
)
9601024
.then(
9611025
() => {
962-
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
1026+
if (changeStreamError instanceof MongoOperationTimeoutError)
1027+
this.emit(ChangeStream.ERROR, changeStreamError);
9631028
},
9641029
() => this._closeEmitterModeWithError(changeStreamError)
9651030
);
@@ -969,33 +1034,44 @@ export class ChangeStream<
9691034
}
9701035

9711036
/** @internal */
972-
private async _processErrorIteratorMode(changeStreamError: AnyError) {
1037+
private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) {
9731038
if (this[kClosed]) {
9741039
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
9751040
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
9761041
}
9771042

9781043
if (
979-
this.cursor.id == null ||
980-
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
1044+
cursorInitialized &&
1045+
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
1046+
changeStreamError instanceof MongoOperationTimeoutError)
9811047
) {
1048+
try {
1049+
await this.cursor.close();
1050+
} catch (error) {
1051+
squashError(error);
1052+
}
1053+
1054+
await this._resume(changeStreamError);
1055+
1056+
if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
1057+
} else {
9821058
try {
9831059
await this.close();
9841060
} catch (error) {
9851061
squashError(error);
9861062
}
1063+
9871064
throw changeStreamError;
9881065
}
1066+
}
9891067

990-
try {
991-
await this.cursor.close();
992-
} catch (error) {
993-
squashError(error);
994-
}
1068+
private async _resume(changeStreamError: AnyError) {
1069+
this.timeoutContext?.refresh();
9951070
const topology = getTopology(this.parent);
9961071
try {
9971072
await topology.selectServer(this.cursor.readPreference, {
998-
operationName: 'reconnect topology in change stream'
1073+
operationName: 'reconnect topology in change stream',
1074+
timeoutContext: this.timeoutContext
9991075
});
10001076
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10011077
} catch {

src/cmap/connection.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
505505
responseType?: MongoDBResponseConstructor
506506
) {
507507
const message = this.prepareCommand(ns.db, command, options);
508-
509508
let started = 0;
510509
if (this.shouldEmitAndLogCommand) {
511510
started = now();
@@ -717,8 +716,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
717716
try {
718717
return await Promise.race([drainEvent, timeout]);
719718
} catch (error) {
719+
let err = error;
720720
if (TimeoutError.is(error)) {
721-
throw new MongoOperationTimeoutError('Timed out at socket write');
721+
err = new MongoOperationTimeoutError('Timed out at socket write');
722+
this.cleanup(err);
722723
}
723724
throw error;
724725
} finally {
@@ -753,6 +754,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
753754
}
754755
}
755756
} catch (readError) {
757+
const err = readError;
756758
if (TimeoutError.is(readError)) {
757759
const error = new MongoOperationTimeoutError(
758760
`Timed out during socket read (${readError.duration}ms)`
@@ -761,7 +763,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
761763
this.onError(error);
762764
throw error;
763765
}
764-
throw readError;
766+
throw err;
765767
} finally {
766768
this.dataEvents = null;
767769
this.messageStream.pause();

0 commit comments

Comments
 (0)