@@ -20,6 +20,7 @@ import type { Topology } from './sdam/topology';
20
20
import type { Writable } from 'stream' ;
21
21
import type { StreamOptions } from './cursor/core_cursor' ;
22
22
import type { OperationParent } from './operations/command' ;
23
+ import type { CollationOptions } from './cmap/wire_protocol/write_command' ;
23
24
const kResumeQueue = Symbol ( 'resumeQueue' ) ;
24
25
25
26
const CHANGE_STREAM_OPTIONS = [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' , 'fullDocument' ] ;
@@ -33,6 +34,14 @@ const CHANGE_DOMAIN_TYPES = {
33
34
CLUSTER : Symbol ( 'Cluster' )
34
35
} ;
35
36
37
+ export interface ResumeOptions {
38
+ startAtOperationTime ?: Timestamp ;
39
+ batchSize ?: number ;
40
+ maxAwaitTimeMS ?: number ;
41
+ collation ?: CollationOptions ;
42
+ readPreference ?: ReadPreference ;
43
+ }
44
+
36
45
/**
37
46
* Represents the logical starting point for a new or resuming {@link https://docs.mongodb.com/master/changeStreams/#change-stream-resume-token| Change Stream} on the server.
38
47
* @public
@@ -209,6 +218,7 @@ export class ChangeStream extends EventEmitter {
209
218
this . topology = parent . s . topology ;
210
219
} else if ( parent instanceof MongoClient ) {
211
220
this . type = CHANGE_DOMAIN_TYPES . CLUSTER ;
221
+ // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
212
222
this . topology = parent . topology ! ;
213
223
} else {
214
224
throw new TypeError (
@@ -406,30 +416,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
406
416
}
407
417
}
408
418
409
- set resumeToken ( token ) {
419
+ set resumeToken ( token : ResumeToken ) {
410
420
this . _resumeToken = token ;
411
421
this . emit ( ChangeStream . RESUME_TOKEN_CHANGED , token ) ;
412
422
}
413
423
414
- get resumeToken ( ) {
424
+ get resumeToken ( ) : ResumeToken {
415
425
return this . _resumeToken ;
416
426
}
417
427
418
- get resumeOptions ( ) {
419
- const result : Document = { } ;
428
+ get resumeOptions ( ) : ResumeOptions {
429
+ const result = { } as ResumeOptions ;
420
430
for ( const optionName of CURSOR_OPTIONS ) {
421
431
if ( Reflect . has ( this . options , optionName ) ) {
422
- result [ optionName ] = Reflect . get ( this . options , optionName ) ;
432
+ Reflect . set ( result , optionName , Reflect . get ( this . options , optionName ) ) ;
423
433
}
424
434
}
425
435
426
436
if ( this . resumeToken || this . startAtOperationTime ) {
427
- [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' ] . forEach ( key => delete result [ key ] ) ;
437
+ [ 'resumeAfter' , 'startAfter' , 'startAtOperationTime' ] . forEach ( key =>
438
+ Reflect . deleteProperty ( result , key )
439
+ ) ;
428
440
429
441
if ( this . resumeToken ) {
430
442
const resumeKey =
431
443
this . options . startAfter && ! this . hasReceived ? 'startAfter' : 'resumeAfter' ;
432
- result [ resumeKey ] = this . resumeToken ;
444
+ Reflect . set ( result , resumeKey , this . resumeToken ) ;
433
445
} else if ( this . startAtOperationTime && maxWireVersion ( this . server ) >= 7 ) {
434
446
result . startAtOperationTime = this . startAtOperationTime ;
435
447
}
@@ -438,7 +450,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
438
450
return result ;
439
451
}
440
452
441
- cacheResumeToken ( resumeToken : ResumeToken ) {
453
+ cacheResumeToken ( resumeToken : ResumeToken ) : void {
442
454
if ( this . bufferedCount ( ) === 0 && this . cursorState . postBatchResumeToken ) {
443
455
this . resumeToken = this . cursorState . postBatchResumeToken ;
444
456
} else {
@@ -447,7 +459,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
447
459
this . hasReceived = true ;
448
460
}
449
461
450
- _processBatch ( batchName : string , response ?: Document ) {
462
+ _processBatch ( batchName : string , response ?: Document ) : void {
451
463
const cursor = response ?. cursor || { } ;
452
464
if ( cursor . postBatchResumeToken ) {
453
465
this . cursorState . postBatchResumeToken = cursor . postBatchResumeToken ;
@@ -458,7 +470,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
458
470
}
459
471
}
460
472
461
- _initializeCursor ( callback : Callback ) {
473
+ _initializeCursor ( callback : Callback ) : void {
462
474
super . _initializeCursor ( ( err , response ) => {
463
475
if ( err || response == null ) {
464
476
callback ( err , response ) ;
@@ -482,7 +494,7 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
482
494
} ) ;
483
495
}
484
496
485
- _getMore ( callback : Callback ) {
497
+ _getMore ( callback : Callback ) : void {
486
498
super . _getMore ( ( err , response ) => {
487
499
if ( err ) {
488
500
callback ( err ) ;
@@ -644,7 +656,7 @@ function processError(changeStream: ChangeStream, error?: AnyError, callback?: C
644
656
changeStream . closed = true ;
645
657
}
646
658
647
- if ( cursor && isResumableError ( error , maxWireVersion ( cursor . server ) ) ) {
659
+ if ( cursor && isResumableError ( error as MongoError , maxWireVersion ( cursor . server ) ) ) {
648
660
changeStream . cursor = undefined ;
649
661
650
662
// stop listening to all events from old cursor
0 commit comments