1
1
import Denque = require( 'denque' ) ;
2
2
import { EventEmitter } from 'events' ;
3
3
import { MongoError , AnyError , isResumableError } from './error' ;
4
- import { Cursor , CursorOptions , CursorStream , CursorStreamOptions } from './cursor/cursor' ;
5
4
import { AggregateOperation , AggregateOptions } from './operations/aggregate' ;
6
5
import {
7
6
relayEvents ,
@@ -21,6 +20,14 @@ import type { CollationOptions } from './cmap/wire_protocol/write_command';
21
20
import { MongoClient } from './mongo_client' ;
22
21
import { Db } from './db' ;
23
22
import { Collection } from './collection' ;
23
+ import type { Readable } from 'stream' ;
24
+ import {
25
+ AbstractCursor ,
26
+ AbstractCursorOptions ,
27
+ CursorStreamOptions
28
+ } from './cursor/abstract_cursor' ;
29
+ import type { ClientSession } from './sessions' ;
30
+ import { executeOperation , ExecutionResult } from './operations/execute_operation' ;
24
31
25
32
const kResumeQueue = Symbol ( 'resumeQueue' ) ;
26
33
const kCursorStream = Symbol ( 'cursorStream' ) ;
@@ -162,13 +169,6 @@ interface UpdateDescription {
162
169
removedFields : string [ ] ;
163
170
}
164
171
165
- /** @internal */
166
- export class ChangeStreamStream extends CursorStream {
167
- constructor ( cursor : ChangeStreamCursor ) {
168
- super ( cursor ) ;
169
- }
170
- }
171
-
172
172
/**
173
173
* Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
174
174
* @public
@@ -183,7 +183,7 @@ export class ChangeStream extends EventEmitter {
183
183
closed : boolean ;
184
184
streamOptions ?: CursorStreamOptions ;
185
185
[ kResumeQueue ] : Denque ;
186
- [ kCursorStream ] ?: CursorStream ;
186
+ [ kCursorStream ] ?: Readable ;
187
187
188
188
/** @event */
189
189
static readonly CLOSE = 'close' as const ;
@@ -252,13 +252,13 @@ export class ChangeStream extends EventEmitter {
252
252
253
253
this . on ( 'removeListener' , eventName => {
254
254
if ( eventName === 'change' && this . listenerCount ( 'change' ) === 0 && this . cursor ) {
255
- this [ kCursorStream ] ?. removeAllListeners ( CursorStream . DATA ) ;
255
+ this [ kCursorStream ] ?. removeAllListeners ( 'data' ) ;
256
256
}
257
257
} ) ;
258
258
}
259
259
260
260
/** @internal */
261
- get cursorStream ( ) : CursorStream | undefined {
261
+ get cursorStream ( ) : Readable | undefined {
262
262
return this [ kCursorStream ] ;
263
263
}
264
264
@@ -325,7 +325,7 @@ export class ChangeStream extends EventEmitter {
325
325
* Return a modified Readable stream including a possible transform method.
326
326
* @throws MongoError if this.cursor is undefined
327
327
*/
328
- stream ( options ?: CursorStreamOptions ) : ChangeStreamStream {
328
+ stream ( options ?: CursorStreamOptions ) : Readable {
329
329
this . streamOptions = options ;
330
330
if ( ! this . cursor ) {
331
331
throw new MongoError ( 'ChangeStream has no cursor, unable to stream' ) ;
@@ -335,28 +335,34 @@ export class ChangeStream extends EventEmitter {
335
335
}
336
336
337
337
/** @public */
338
- export interface ChangeStreamCursorOptions extends CursorOptions {
338
+ export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
339
339
startAtOperationTime ?: OperationTime ;
340
340
resumeAfter ?: ResumeToken ;
341
341
startAfter ?: boolean ;
342
342
}
343
343
344
344
/** @internal */
345
- export class ChangeStreamCursor extends Cursor < AggregateOperation , ChangeStreamCursorOptions > {
345
+ export class ChangeStreamCursor extends AbstractCursor {
346
346
_resumeToken : ResumeToken ;
347
347
startAtOperationTime ?: OperationTime ;
348
348
hasReceived ?: boolean ;
349
349
resumeAfter : ResumeToken ;
350
350
startAfter : ResumeToken ;
351
+ options : ChangeStreamCursorOptions ;
352
+
353
+ postBatchResumeToken ?: Document ;
354
+ pipeline : Document [ ] ;
351
355
352
356
constructor (
353
357
topology : Topology ,
354
- operation : AggregateOperation ,
355
- options : ChangeStreamCursorOptions
358
+ namespace : MongoDBNamespace ,
359
+ pipeline : Document [ ] = [ ] ,
360
+ options : ChangeStreamCursorOptions = { }
356
361
) {
357
- super ( topology , operation , options ) ;
362
+ super ( topology , namespace , options ) ;
358
363
359
- options = options || { } ;
364
+ this . pipeline = pipeline ;
365
+ this . options = options ;
360
366
this . _resumeToken = null ;
361
367
this . startAtOperationTime = options . startAtOperationTime ;
362
368
@@ -421,18 +427,28 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
421
427
}
422
428
}
423
429
424
- _initializeCursor ( callback : Callback ) : void {
425
- super . _initializeCursor ( ( err , response ) => {
430
+ _initialize ( session : ClientSession , callback : Callback < ExecutionResult > ) : void {
431
+ const aggregateOperation = new AggregateOperation (
432
+ { s : { namespace : this . namespace } } ,
433
+ this . pipeline ,
434
+ {
435
+ ...this . cursorOptions ,
436
+ ...this . options ,
437
+ session
438
+ }
439
+ ) ;
440
+
441
+ executeOperation ( this . topology , aggregateOperation , ( err , response ) => {
426
442
if ( err || response == null ) {
427
- callback ( err , response ) ;
428
- return ;
443
+ return callback ( err ) ;
429
444
}
430
445
446
+ const server = aggregateOperation . server ;
431
447
if (
432
448
this . startAtOperationTime == null &&
433
449
this . resumeAfter == null &&
434
450
this . startAfter == null &&
435
- maxWireVersion ( this . server ) >= 7
451
+ maxWireVersion ( server ) >= 7
436
452
) {
437
453
this . startAtOperationTime = response . operationTime ;
438
454
}
@@ -441,15 +457,16 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
441
457
442
458
this . emit ( 'init' , response ) ;
443
459
this . emit ( 'response' ) ;
444
- callback ( err , response ) ;
460
+
461
+ // TODO: NODE-2882
462
+ callback ( undefined , { server, session, response } ) ;
445
463
} ) ;
446
464
}
447
465
448
- _getMore ( callback : Callback ) : void {
449
- super . _getMore ( ( err , response ) => {
466
+ _getMore ( batchSize : number , callback : Callback ) : void {
467
+ super . _getMore ( batchSize , ( err , response ) => {
450
468
if ( err ) {
451
- callback ( err ) ;
452
- return ;
469
+ return callback ( err ) ;
453
470
}
454
471
455
472
this . _processBatch ( 'nextBatch' , response ) ;
@@ -466,26 +483,32 @@ export class ChangeStreamCursor extends Cursor<AggregateOperation, ChangeStreamC
466
483
* @internal
467
484
*/
468
485
function createChangeStreamCursor (
469
- self : ChangeStream ,
486
+ changeStream : ChangeStream ,
470
487
options : ChangeStreamOptions
471
488
) : ChangeStreamCursor {
472
489
const changeStreamStageOptions : Document = { fullDocument : options . fullDocument || 'default' } ;
473
490
applyKnownOptions ( changeStreamStageOptions , options , CHANGE_STREAM_OPTIONS ) ;
474
- if ( self . type === CHANGE_DOMAIN_TYPES . CLUSTER ) {
491
+ if ( changeStream . type === CHANGE_DOMAIN_TYPES . CLUSTER ) {
475
492
changeStreamStageOptions . allChangesForCluster = true ;
476
493
}
477
494
478
- const pipeline = [ { $changeStream : changeStreamStageOptions } as Document ] . concat ( self . pipeline ) ;
495
+ const pipeline = [ { $changeStream : changeStreamStageOptions } as Document ] . concat (
496
+ changeStream . pipeline
497
+ ) ;
498
+
479
499
const cursorOptions = applyKnownOptions ( { } , options , CURSOR_OPTIONS ) ;
480
500
const changeStreamCursor = new ChangeStreamCursor (
481
- getTopology ( self . parent ) ,
482
- new AggregateOperation ( self . parent , pipeline , options ) ,
501
+ getTopology ( changeStream . parent ) ,
502
+ changeStream . namespace ,
503
+ pipeline ,
483
504
cursorOptions
484
505
) ;
485
506
486
- relayEvents ( changeStreamCursor , self , [ 'resumeTokenChanged' , 'end' , 'close' ] ) ;
507
+ relayEvents ( changeStreamCursor , changeStream , [ 'resumeTokenChanged' , 'end' , 'close' ] ) ;
508
+ if ( changeStream . listenerCount ( ChangeStream . CHANGE ) > 0 ) {
509
+ streamEvents ( changeStream , changeStreamCursor ) ;
510
+ }
487
511
488
- if ( self . listenerCount ( ChangeStream . CHANGE ) > 0 ) streamEvents ( self , changeStreamCursor ) ;
489
512
return changeStreamCursor ;
490
513
}
491
514
@@ -532,24 +555,24 @@ function waitForTopologyConnected(
532
555
}
533
556
534
557
function closeWithError ( changeStream : ChangeStream , error : AnyError , callback ?: Callback ) : void {
535
- if ( ! callback ) changeStream . emit ( ChangeStream . ERROR , error ) ;
558
+ if ( ! callback ) {
559
+ changeStream . emit ( ChangeStream . ERROR , error ) ;
560
+ }
561
+
536
562
changeStream . close ( ( ) => callback && callback ( error ) ) ;
537
563
}
538
564
539
565
function streamEvents ( changeStream : ChangeStream , cursor : ChangeStreamCursor ) : void {
540
566
const stream = changeStream [ kCursorStream ] || cursor . stream ( ) ;
541
567
changeStream [ kCursorStream ] = stream ;
542
- stream . on ( CursorStream . DATA , change => processNewChange ( changeStream , change ) ) ;
543
- stream . on ( CursorStream . ERROR , error => processError ( changeStream , error ) ) ;
568
+ stream . on ( 'data' , change => processNewChange ( changeStream , change ) ) ;
569
+ stream . on ( 'error' , error => processError ( changeStream , error ) ) ;
544
570
}
545
571
546
572
function endStream ( changeStream : ChangeStream ) : void {
547
573
const cursorStream = changeStream [ kCursorStream ] ;
548
574
if ( cursorStream ) {
549
- [ CursorStream . DATA , CursorStream . CLOSE , CursorStream . END , CursorStream . ERROR ] . forEach ( event =>
550
- cursorStream . removeAllListeners ( event )
551
- ) ;
552
-
575
+ [ 'data' , 'close' , 'end' , 'error' ] . forEach ( event => cursorStream . removeAllListeners ( event ) ) ;
553
576
cursorStream . destroy ( ) ;
554
577
}
555
578
@@ -605,7 +628,10 @@ function processError(changeStream: ChangeStream, error: AnyError, callback?: Ca
605
628
606
629
// otherwise, raise an error and close the change stream
607
630
function unresumableError ( err : AnyError ) {
608
- if ( ! callback ) changeStream . emit ( ChangeStream . ERROR , err ) ;
631
+ if ( ! callback ) {
632
+ changeStream . emit ( ChangeStream . ERROR , err ) ;
633
+ }
634
+
609
635
changeStream . close ( ( ) => processResumeQueue ( changeStream , err ) ) ;
610
636
}
611
637
@@ -676,6 +702,7 @@ function processResumeQueue(changeStream: ChangeStream, err?: Error) {
676
702
request ( new MongoError ( 'Change Stream is not open.' ) ) ;
677
703
return ;
678
704
}
705
+
679
706
request ( err , changeStream . cursor ) ;
680
707
}
681
708
}
0 commit comments