Skip to content

Commit e5e443d

Browse files
committed
add CSOT support to change streams
1 parent efbed5c commit e5e443d

File tree

7 files changed

+316
-71
lines changed

7 files changed

+316
-71
lines changed

src/change_stream.ts

Lines changed: 106 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@ import type { Readable } from 'stream';
33
import type { Binary, Document, Timestamp } from './bson';
44
import { Collection } from './collection';
55
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
6-
import type { AbstractCursorEvents, CursorStreamOptions } from './cursor/abstract_cursor';
6+
import {
7+
type AbstractCursorEvents,
8+
type CursorStreamOptions,
9+
CursorTimeoutContext
10+
} from './cursor/abstract_cursor';
711
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
812
import { Db } from './db';
913
import {
1014
type AnyError,
1115
isResumableError,
1216
MongoAPIError,
1317
MongoChangeStreamError,
18+
MongoOperationTimeoutError,
1419
MongoRuntimeError
1520
} from './error';
1621
import { MongoClient } from './mongo_client';
@@ -20,6 +25,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2025
import type { ReadPreference } from './read_preference';
2126
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
2227
import type { ServerSessionId } from './sessions';
28+
import { type TimeoutContext } from './timeout';
2329
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
2430

2531
/** @internal */
@@ -609,6 +615,8 @@ export class ChangeStream<
609615
*/
610616
static readonly RESUME_TOKEN_CHANGED = RESUME_TOKEN_CHANGED;
611617

618+
private timeoutContext?: TimeoutContext;
619+
private symbol: symbol;
612620
/**
613621
* @internal
614622
*
@@ -638,6 +646,7 @@ export class ChangeStream<
638646
);
639647
}
640648

649+
this.symbol = Symbol();
641650
this.parent = parent;
642651
this.namespace = parent.s.namespace;
643652
if (!this.options.readPreference && parent.readPreference) {
@@ -681,22 +690,31 @@ export class ChangeStream<
681690
// This loop continues until either a change event is received or until a resume attempt
682691
// fails.
683692

684-
while (true) {
685-
try {
686-
const hasNext = await this.cursor.hasNext();
687-
return hasNext;
688-
} catch (error) {
693+
this.timeoutContext?.refresh();
694+
try {
695+
while (true) {
696+
const cursorInitialized = this.cursor.id != null;
689697
try {
690-
await this._processErrorIteratorMode(error);
698+
const hasNext = await this.cursor.hasNext();
699+
return hasNext;
691700
} catch (error) {
692701
try {
693-
await this.close();
702+
await this._processErrorIteratorMode(error, cursorInitialized);
694703
} catch (error) {
695-
squashError(error);
704+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) {
705+
throw error;
706+
}
707+
try {
708+
await this.close();
709+
} catch (error) {
710+
squashError(error);
711+
}
712+
throw error;
696713
}
697-
throw error;
698714
}
699715
}
716+
} finally {
717+
this.timeoutContext?.clear();
700718
}
701719
}
702720

@@ -706,24 +724,33 @@ export class ChangeStream<
706724
// Change streams must resume indefinitely while each resume event succeeds.
707725
// This loop continues until either a change event is received or until a resume attempt
708726
// fails.
727+
this.timeoutContext?.refresh();
709728

710-
while (true) {
711-
try {
712-
const change = await this.cursor.next();
713-
const processedChange = this._processChange(change ?? null);
714-
return processedChange;
715-
} catch (error) {
729+
try {
730+
while (true) {
731+
const cursorInitialized = this.cursor.id != null;
716732
try {
717-
await this._processErrorIteratorMode(error);
733+
const change = await this.cursor.next();
734+
const processedChange = this._processChange(change ?? null);
735+
return processedChange;
718736
} catch (error) {
719737
try {
720-
await this.close();
738+
await this._processErrorIteratorMode(error, cursorInitialized);
721739
} catch (error) {
722-
squashError(error);
740+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) {
741+
throw error;
742+
}
743+
try {
744+
await this.close();
745+
} catch (error) {
746+
squashError(error);
747+
}
748+
throw error;
723749
}
724-
throw error;
725750
}
726751
}
752+
} finally {
753+
this.timeoutContext?.clear();
727754
}
728755
}
729756

@@ -735,23 +762,30 @@ export class ChangeStream<
735762
// Change streams must resume indefinitely while each resume event succeeds.
736763
// This loop continues until either a change event is received or until a resume attempt
737764
// fails.
765+
this.timeoutContext?.refresh();
738766

739-
while (true) {
740-
try {
741-
const change = await this.cursor.tryNext();
742-
return change ?? null;
743-
} catch (error) {
767+
try {
768+
while (true) {
769+
const cursorInitialized = this.cursor.id != null;
744770
try {
745-
await this._processErrorIteratorMode(error);
771+
const change = await this.cursor.tryNext();
772+
return change ?? null;
746773
} catch (error) {
747774
try {
748-
await this.close();
775+
await this._processErrorIteratorMode(error, cursorInitialized);
749776
} catch (error) {
750-
squashError(error);
777+
if (error instanceof MongoOperationTimeoutError && cursorInitialized) throw error;
778+
try {
779+
await this.close();
780+
} catch (error) {
781+
squashError(error);
782+
}
783+
throw error;
751784
}
752-
throw error;
753785
}
754786
}
787+
} finally {
788+
this.timeoutContext?.clear();
755789
}
756790
}
757791

@@ -784,6 +818,8 @@ export class ChangeStream<
784818
* Frees the internal resources used by the change stream.
785819
*/
786820
async close(): Promise<void> {
821+
this.timeoutContext?.clear();
822+
this.timeoutContext = undefined;
787823
this[kClosed] = true;
788824

789825
const cursor = this.cursor;
@@ -866,7 +902,12 @@ export class ChangeStream<
866902
client,
867903
this.namespace,
868904
pipeline,
869-
options
905+
{
906+
...options,
907+
timeoutContext: this.timeoutContext
908+
? new CursorTimeoutContext(this.timeoutContext, this.symbol)
909+
: undefined
910+
}
870911
);
871912

872913
for (const event of CHANGE_STREAM_EVENTS) {
@@ -893,14 +934,17 @@ export class ChangeStream<
893934
const stream = this[kCursorStream] ?? cursor.stream();
894935
this[kCursorStream] = stream;
895936
stream.on('data', change => {
937+
this.timeoutContext?.refresh();
896938
try {
897939
const processedChange = this._processChange(change);
898940
this.emit(ChangeStream.CHANGE, processedChange);
899941
} catch (error) {
900942
this.emit(ChangeStream.ERROR, error);
943+
} finally {
944+
this.timeoutContext?.clear();
901945
}
902946
});
903-
stream.on('error', error => this._processErrorStreamMode(error));
947+
stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
904948
}
905949

906950
/** @internal */
@@ -942,24 +986,30 @@ export class ChangeStream<
942986
}
943987

944988
/** @internal */
945-
private _processErrorStreamMode(changeStreamError: AnyError) {
989+
private _processErrorStreamMode(changeStreamError: AnyError, cursorInitialized: boolean) {
946990
// If the change stream has been closed explicitly, do not process error.
947991
if (this[kClosed]) return;
948992

949-
if (this.cursor.id != null && isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
993+
if (
994+
cursorInitialized &&
995+
(isResumableError(changeStreamError, this.cursor.maxWireVersion) ||
996+
changeStreamError instanceof MongoOperationTimeoutError)
997+
) {
950998
this._endStream();
951999

952-
this.cursor.close().then(undefined, squashError);
953-
954-
const topology = getTopology(this.parent);
955-
topology
956-
.selectServer(this.cursor.readPreference, {
957-
operationName: 'reconnect topology in change stream'
958-
})
959-
1000+
this.cursor
1001+
.close()
1002+
.then(
1003+
() => this._resume(changeStreamError),
1004+
e => {
1005+
squashError(e);
1006+
return this._resume(changeStreamError);
1007+
}
1008+
)
9601009
.then(
9611010
() => {
962-
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
1011+
if (changeStreamError instanceof MongoOperationTimeoutError)
1012+
this.emit(ChangeStream.ERROR, changeStreamError);
9631013
},
9641014
() => this._closeEmitterModeWithError(changeStreamError)
9651015
);
@@ -969,15 +1019,16 @@ export class ChangeStream<
9691019
}
9701020

9711021
/** @internal */
972-
private async _processErrorIteratorMode(changeStreamError: AnyError) {
1022+
private async _processErrorIteratorMode(changeStreamError: AnyError, cursorInitialized: boolean) {
9731023
if (this[kClosed]) {
9741024
// TODO(NODE-3485): Replace with MongoChangeStreamClosedError
9751025
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
9761026
}
9771027

9781028
if (
979-
this.cursor.id == null ||
980-
!isResumableError(changeStreamError, this.cursor.maxWireVersion)
1029+
!cursorInitialized ||
1030+
(!isResumableError(changeStreamError, this.cursor.maxWireVersion) &&
1031+
!(changeStreamError instanceof MongoOperationTimeoutError))
9811032
) {
9821033
try {
9831034
await this.close();
@@ -992,10 +1043,19 @@ export class ChangeStream<
9921043
} catch (error) {
9931044
squashError(error);
9941045
}
1046+
1047+
await this._resume(changeStreamError);
1048+
1049+
if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
1050+
}
1051+
1052+
private async _resume(changeStreamError: AnyError) {
1053+
this.timeoutContext?.refresh();
9951054
const topology = getTopology(this.parent);
9961055
try {
9971056
await topology.selectServer(this.cursor.readPreference, {
998-
operationName: 'reconnect topology in change stream'
1057+
operationName: 'reconnect topology in change stream',
1058+
timeoutContext: this.timeoutContext
9991059
});
10001060
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
10011061
} catch {

src/cmap/connection.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
500500
responseType?: MongoDBResponseConstructor
501501
) {
502502
const message = this.prepareCommand(ns.db, command, options);
503-
504503
let started = 0;
505504
if (this.shouldEmitAndLogCommand) {
506505
started = now();
@@ -712,10 +711,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
712711
try {
713712
return await Promise.race([drainEvent, timeout]);
714713
} catch (error) {
714+
let err = error;
715715
if (TimeoutError.is(error)) {
716-
throw new MongoOperationTimeoutError('Timed out at socket write');
716+
err = new MongoOperationTimeoutError('Timed out at socket write');
717717
}
718-
throw error;
718+
this.cleanup(error);
719+
throw err;
719720
}
720721
}
721722
return await drainEvent;
@@ -746,6 +747,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
746747
}
747748
}
748749
} catch (readError) {
750+
const err = readError;
749751
if (TimeoutError.is(readError)) {
750752
const error = new MongoOperationTimeoutError(
751753
`Timed out during socket read (${readError.duration}ms)`
@@ -754,7 +756,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
754756
this.onError(error);
755757
throw error;
756758
}
757-
throw readError;
759+
this.cleanup(err);
760+
throw err;
758761
} finally {
759762
this.dataEvents = null;
760763
this.messageStream.pause();

0 commit comments

Comments
 (0)