Skip to content

Commit ffcb23b

Browse files
committed
WIP
1 parent 05c3fcd commit ffcb23b

File tree

4 files changed

+75
-28
lines changed

4 files changed

+75
-28
lines changed

src/change_stream.ts

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
isResumableError,
1212
MongoAPIError,
1313
MongoChangeStreamError,
14+
MongoOperationTimeoutError,
1415
MongoRuntimeError
1516
} from './error';
1617
import { MongoClient } from './mongo_client';
@@ -20,6 +21,7 @@ import type { CollationOptions, OperationParent } from './operations/command';
2021
import type { ReadPreference } from './read_preference';
2122
import { type AsyncDisposable, configureResourceManagement } from './resource_management';
2223
import type { ServerSessionId } from './sessions';
24+
import { TimeoutContext } from './timeout';
2325
import { filterOptions, getTopology, type MongoDBNamespace, squashError } from './utils';
2426

2527
/** @internal */
@@ -584,6 +586,8 @@ export class ChangeStream<
584586
/** @internal */
585587
[kMode]: false | 'iterator' | 'emitter';
586588

589+
private timeoutContext?: TimeoutContext;
590+
587591
/** @event */
588592
static readonly RESPONSE = RESPONSE;
589593
/** @event */
@@ -689,6 +693,9 @@ export class ChangeStream<
689693
try {
690694
await this._processErrorIteratorMode(error);
691695
} catch (error) {
696+
if (error instanceof MongoOperationTimeoutError) {
697+
throw error;
698+
}
692699
try {
693700
await this.close();
694701
} catch (error) {
@@ -705,25 +712,33 @@ export class ChangeStream<
705712
this._setIsIterator();
706713
// Change streams must resume indefinitely while each resume event succeeds.
707714
// This loop continues until either a change event is received or until a resume attempt
708-
// fails.
715+
// fails or until a timeout error is encountered
716+
this.timeoutContext?.refresh();
709717

710-
while (true) {
711-
try {
712-
const change = await this.cursor.next();
713-
const processedChange = this._processChange(change ?? null);
714-
return processedChange;
715-
} catch (error) {
718+
try {
719+
while (true) {
716720
try {
717-
await this._processErrorIteratorMode(error);
721+
const change = await this.cursor.next();
722+
const processedChange = this._processChange(change ?? null);
723+
return processedChange;
718724
} catch (error) {
719725
try {
720-
await this.close();
726+
await this._processErrorIteratorMode(error);
721727
} catch (error) {
722-
squashError(error);
728+
if (error instanceof MongoOperationTimeoutError) {
729+
throw error; // Don't close the change stream, but throw the timeout error
730+
}
731+
try {
732+
await this.close();
733+
} catch (error) {
734+
squashError(error);
735+
}
736+
throw error;
723737
}
724-
throw error;
725738
}
726739
}
740+
} finally {
741+
this.timeoutContext?.clear();
727742
}
728743
}
729744

@@ -744,6 +759,9 @@ export class ChangeStream<
744759
try {
745760
await this._processErrorIteratorMode(error);
746761
} catch (error) {
762+
if (error instanceof MongoOperationTimeoutError) {
763+
throw error; // throw the error without closing the change stream
764+
}
747765
try {
748766
await this.close();
749767
} catch (error) {
@@ -862,11 +880,20 @@ export class ChangeStream<
862880
);
863881
}
864882

883+
if (this.options.timeoutMS != null) {
884+
this.timeoutContext ??= TimeoutContext.create({
885+
timeoutMS: this.options.timeoutMS,
886+
serverSelectionTimeoutMS: client.options.serverSelectionTimeoutMS
887+
});
888+
delete this.options.timeoutMS;
889+
}
890+
865891
const changeStreamCursor = new ChangeStreamCursor<TSchema, TChange>(
866892
client,
867893
this.namespace,
868894
pipeline,
869-
options
895+
options,
896+
this.timeoutContext
870897
);
871898

872899
for (const event of CHANGE_STREAM_EVENTS) {
@@ -946,6 +973,10 @@ export class ChangeStream<
946973
// If the change stream has been closed explicitly, do not process error.
947974
if (this[kClosed]) return;
948975

976+
if (changeStreamError instanceof MongoOperationTimeoutError) {
977+
return; // FIXME: At least emit the error
978+
}
979+
949980
if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
950981
this._endStream();
951982

@@ -975,7 +1006,10 @@ export class ChangeStream<
9751006
throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
9761007
}
9771008

978-
if (!isResumableError(changeStreamError, this.cursor.maxWireVersion)) {
1009+
if (
1010+
!isResumableError(changeStreamError, this.cursor.maxWireVersion) &&
1011+
!(changeStreamError instanceof MongoOperationTimeoutError)
1012+
) {
9791013
try {
9801014
await this.close();
9811015
} catch (error) {
@@ -1000,6 +1034,8 @@ export class ChangeStream<
10001034
await this.close();
10011035
throw changeStreamError;
10021036
}
1037+
1038+
if (changeStreamError instanceof MongoOperationTimeoutError) throw changeStreamError;
10031039
}
10041040
}
10051041

src/cursor/abstract_cursor.ts

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,11 @@ export type AbstractCursorEvents = {
142142

143143
/** @public */
144144
export abstract class AbstractCursor<
145-
TSchema = any,
146-
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
147-
>
145+
TSchema = any,
146+
CursorEvents extends AbstractCursorEvents = AbstractCursorEvents
147+
>
148148
extends TypedEventEmitter<CursorEvents>
149-
implements AsyncDisposable
150-
{
149+
implements AsyncDisposable {
151150
/** @internal */
152151
private cursorId: Long | null;
153152
/** @internal */
@@ -172,6 +171,8 @@ export abstract class AbstractCursor<
172171
protected readonly cursorOptions: InternalAbstractCursorOptions;
173172
/** @internal */
174173
protected timeoutContext?: TimeoutContext;
174+
/** @internal */
175+
protected isChangeStreamCursor?: boolean;
175176

176177
/** @event */
177178
static readonly CLOSE = 'close' as const;
@@ -455,8 +456,9 @@ export abstract class AbstractCursor<
455456
if (this.cursorId === Long.ZERO) {
456457
return false;
457458
}
459+
const shouldRefresh = !this.isChangeStreamCursor && this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;
458460

459-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
461+
if (shouldRefresh) {
460462
this.timeoutContext?.refresh();
461463
}
462464
try {
@@ -467,7 +469,7 @@ export abstract class AbstractCursor<
467469
await this.fetchBatch();
468470
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
469471
} finally {
470-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
472+
if (shouldRefresh) {
471473
this.timeoutContext?.clear();
472474
}
473475
}
@@ -480,7 +482,9 @@ export abstract class AbstractCursor<
480482
if (this.cursorId === Long.ZERO) {
481483
throw new MongoCursorExhaustedError();
482484
}
483-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
485+
const shouldRefresh = !this.isChangeStreamCursor && this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;
486+
487+
if (shouldRefresh) {
484488
this.timeoutContext?.refresh();
485489
}
486490

@@ -494,7 +498,7 @@ export abstract class AbstractCursor<
494498
await this.fetchBatch();
495499
} while (!this.isDead || (this.documents?.length ?? 0) !== 0);
496500
} finally {
497-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
501+
if (shouldRefresh) {
498502
this.timeoutContext?.clear();
499503
}
500504
}
@@ -510,7 +514,9 @@ export abstract class AbstractCursor<
510514
throw new MongoCursorExhaustedError();
511515
}
512516

513-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
517+
const shouldRefresh = this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null;
518+
519+
if (shouldRefresh) {
514520
this.timeoutContext?.refresh();
515521
}
516522
try {
@@ -528,7 +534,7 @@ export abstract class AbstractCursor<
528534
return doc;
529535
}
530536
} finally {
531-
if (this.cursorOptions.timeoutMode === CursorTimeoutMode.ITERATION && this.cursorId != null) {
537+
if (shouldRefresh) {
532538
this.timeoutContext?.clear();
533539
}
534540
}

src/cursor/change_stream_cursor.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { AggregateOperation } from '../operations/aggregate';
1313
import type { CollationOptions } from '../operations/command';
1414
import { executeOperation } from '../operations/execute_operation';
1515
import type { ClientSession } from '../sessions';
16+
import { type TimeoutContext } from '../timeout';
1617
import { maxWireVersion, type MongoDBNamespace } from '../utils';
1718
import {
1819
AbstractCursor,
@@ -53,10 +54,13 @@ export class ChangeStreamCursor<
5354
client: MongoClient,
5455
namespace: MongoDBNamespace,
5556
pipeline: Document[] = [],
56-
options: ChangeStreamCursorOptions = {}
57+
options: ChangeStreamCursorOptions = {},
58+
timeoutContext?: TimeoutContext
5759
) {
58-
super(client, namespace, options);
60+
super(client, namespace, { ...options, tailable: true, awaitData: true });
61+
this.timeoutContext = timeoutContext;
5962

63+
this.isChangeStreamCursor = true;
6064
this.pipeline = pipeline;
6165
this.changeStreamCursorOptions = options;
6266
this._resumeToken = null;
@@ -110,6 +114,7 @@ export class ChangeStreamCursor<
110114
}
111115

112116
_processBatch(response: CursorResponse): void {
117+
console.log(response.toObject());
113118
const { postBatchResumeToken } = response;
114119
if (postBatchResumeToken) {
115120
this.postBatchResumeToken = postBatchResumeToken;
@@ -130,6 +135,7 @@ export class ChangeStreamCursor<
130135
const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
131136
...this.cursorOptions,
132137
...this.changeStreamCursorOptions,
138+
omitMaxTimeMS: false,
133139
session
134140
});
135141

test/integration/client-side-operations-timeout/client_side_operations_timeout.spec.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ const skippedSpecs = {
1111
'gridfs-delete': 'TODO(NODE-6275)',
1212
'gridfs-download': 'TODO(NODE-6275)',
1313
'gridfs-find': 'TODO(NODE-6275)',
14-
'gridfs-upload': 'TODO(NODE-6275)',
15-
'change-streams': 'TODO(NODE-6387)'
14+
'gridfs-upload': 'TODO(NODE-6275)'
1615
};
1716

1817
const skippedTests = {

0 commit comments

Comments
 (0)