16
16
use MongoDB \Driver \ReadPreference ;
17
17
use MongoDB \Driver \WriteConcern ;
18
18
use MongoDB \Exception \ResumeTokenException ;
19
- use MongoDB \Operation \DatabaseCommand ;
20
19
use MongoDB \Operation \InsertOne ;
21
20
use MongoDB \Operation \Watch ;
22
21
use MongoDB \Tests \CommandObserver ;
@@ -224,7 +223,7 @@ function (array $event) use (&$events) {
224
223
$ postBatchResumeToken = $ this ->getPostBatchResumeTokenFromReply ($ events [0 ]['succeeded ' ]->getReply ());
225
224
226
225
$ this ->assertFalse ($ changeStream ->valid ());
227
- $ this ->killChangeStreamCursor ( $ changeStream );
226
+ $ this ->forceChangeStreamResume ( );
228
227
229
228
$ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
230
229
$ changeStream ->rewind ();
@@ -303,7 +302,7 @@ function (array $event) use (&$events) {
303
302
$ this ->assertInstanceOf (TimestampInterface::class, $ operationTime );
304
303
305
304
$ this ->assertFalse ($ changeStream ->valid ());
306
- $ this ->killChangeStreamCursor ( $ changeStream );
305
+ $ this ->forceChangeStreamResume ( );
307
306
308
307
$ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
309
308
$ changeStream ->rewind ();
@@ -454,7 +453,7 @@ public function testNoChangeAfterResumeBeforeInsert()
454
453
455
454
$ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
456
455
457
- $ this ->killChangeStreamCursor ( $ changeStream );
456
+ $ this ->forceChangeStreamResume ( );
458
457
459
458
$ changeStream ->next ();
460
459
$ this ->assertFalse ($ changeStream ->valid ());
@@ -481,10 +480,10 @@ public function testResumeMultipleTimesInSuccession()
481
480
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
482
481
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
483
482
484
- /* Killing the cursor when there are no results will test that neither
483
+ /* Forcing a resume when there are no results will test that neither
485
484
* the initial rewind() nor a resume attempt via next() increment the
486
485
* key. */
487
- $ this ->killChangeStreamCursor ( $ changeStream );
486
+ $ this ->forceChangeStreamResume ( );
488
487
489
488
$ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
490
489
$ changeStream ->rewind ();
@@ -499,7 +498,7 @@ public function testResumeMultipleTimesInSuccession()
499
498
$ this ->assertNull ($ changeStream ->current ());
500
499
501
500
// A consecutive resume attempt should still not increment the key
502
- $ this ->killChangeStreamCursor ( $ changeStream );
501
+ $ this ->forceChangeStreamResume ( );
503
502
504
503
$ changeStream ->next ();
505
504
$ this ->assertFalse ($ changeStream ->valid ());
@@ -525,10 +524,10 @@ public function testResumeMultipleTimesInSuccession()
525
524
526
525
$ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
527
526
528
- /* Insert another document and kill the cursor . ChangeStream::next()
527
+ /* Insert another document and force a resume . ChangeStream::next()
529
528
* should resume and pick up the last insert. */
530
529
$ this ->insertDocument (['_id ' => 2 ]);
531
- $ this ->killChangeStreamCursor ( $ changeStream );
530
+ $ this ->forceChangeStreamResume ( );
532
531
533
532
$ changeStream ->next ();
534
533
$ this ->assertTrue ($ changeStream ->valid ());
@@ -552,7 +551,7 @@ public function testResumeMultipleTimesInSuccession()
552
551
*
553
552
* Note: PHPLIB-448 may require rewind() to throw an exception here. */
554
553
$ this ->insertDocument (['_id ' => 3 ]);
555
- $ this ->killChangeStreamCursor ( $ changeStream );
554
+ $ this ->forceChangeStreamResume ( );
556
555
557
556
$ this ->assertNoCommandExecuted (function () use ($ changeStream ) {
558
557
$ changeStream ->rewind ();
@@ -578,7 +577,7 @@ public function testResumeMultipleTimesInSuccession()
578
577
579
578
// Test one final, consecutive resume via ChangeStream::next()
580
579
$ this ->insertDocument (['_id ' => 4 ]);
581
- $ this ->killChangeStreamCursor ( $ changeStream );
580
+ $ this ->forceChangeStreamResume ( );
582
581
583
582
$ changeStream ->next ();
584
583
$ this ->assertTrue ($ changeStream ->valid ());
@@ -622,7 +621,7 @@ public function testKey()
622
621
$ this ->assertFalse ($ changeStream ->valid ());
623
622
$ this ->assertNull ($ changeStream ->key ());
624
623
625
- $ this ->killChangeStreamCursor ( $ changeStream );
624
+ $ this ->forceChangeStreamResume ( );
626
625
627
626
$ changeStream ->next ();
628
627
$ this ->assertFalse ($ changeStream ->valid ());
@@ -901,7 +900,7 @@ public function testRewindExtractsResumeTokenAndNextResumes()
901
900
];
902
901
$ this ->assertMatchesDocument ($ expectedResult , $ changeStream ->current ());
903
902
904
- $ this ->killChangeStreamCursor ( $ changeStream );
903
+ $ this ->forceChangeStreamResume ( );
905
904
906
905
$ this ->advanceCursorUntilValid ($ changeStream );
907
906
$ this ->assertSame (1 , $ changeStream ->key ());
@@ -1140,7 +1139,7 @@ function (array $event) use (&$originalSession) {
1140
1139
);
1141
1140
1142
1141
$ changeStream ->rewind ();
1143
- $ this ->killChangeStreamCursor ( $ changeStream );
1142
+ $ this ->forceChangeStreamResume ( );
1144
1143
1145
1144
(new CommandObserver ())->observe (
1146
1145
function () use (&$ changeStream ) {
@@ -1324,7 +1323,7 @@ public function testOriginalReadPreferenceIsPreservedOnResume()
1324
1323
1325
1324
$ changeStream = $ operation ->execute ($ secondary );
1326
1325
$ previousCursorId = $ changeStream ->getCursorId ();
1327
- $ this ->killChangeStreamCursor ( $ changeStream );
1326
+ $ this ->forceChangeStreamResume ( );
1328
1327
1329
1328
$ changeStream ->next ();
1330
1329
$ this ->assertNotSame ($ previousCursorId , $ changeStream ->getCursorId ());
@@ -1465,7 +1464,7 @@ public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfter
1465
1464
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1466
1465
$ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1467
1466
$ changeStream ->rewind ();
1468
- $ this ->killChangeStreamCursor ( $ changeStream );
1467
+ $ this ->forceChangeStreamResume ( );
1469
1468
1470
1469
$ aggregateCommand = null ;
1471
1470
@@ -1516,7 +1515,7 @@ public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOp
1516
1515
$ this ->advanceCursorUntilValid ($ changeStream );
1517
1516
$ this ->assertTrue ($ changeStream ->valid ());
1518
1517
1519
- $ this ->killChangeStreamCursor ( $ changeStream );
1518
+ $ this ->forceChangeStreamResume ( );
1520
1519
1521
1520
$ aggregateCommand = null ;
1522
1521
@@ -1552,6 +1551,15 @@ function (array $event) use (&$commands) {
1552
1551
$ this ->assertEmpty ($ commands );
1553
1552
}
1554
1553
1554
+ private function forceChangeStreamResume (array $ commands = ['getMore ' ], int $ errorCode = self ::NOT_MASTER )
1555
+ {
1556
+ $ this ->configureFailPoint ([
1557
+ 'configureFailPoint ' => 'failCommand ' ,
1558
+ 'mode ' => ['times ' => 1 ],
1559
+ 'data ' => ['failCommands ' => $ commands , 'errorCode ' => $ errorCode ],
1560
+ ]);
1561
+ }
1562
+
1555
1563
private function getPostBatchResumeTokenFromReply (stdClass $ reply )
1556
1564
{
1557
1565
$ this ->assertObjectHasAttribute ('cursor ' , $ reply );
@@ -1584,17 +1592,6 @@ private function isStartAtOperationTimeSupported()
1584
1592
return server_supports_feature ($ this ->getPrimaryServer (), self ::$ wireVersionForStartAtOperationTime );
1585
1593
}
1586
1594
1587
- private function killChangeStreamCursor (ChangeStream $ changeStream )
1588
- {
1589
- $ command = [
1590
- 'killCursors ' => $ this ->getCollectionName (),
1591
- 'cursors ' => [ $ changeStream ->getCursorId () ],
1592
- ];
1593
-
1594
- $ operation = new DatabaseCommand ($ this ->getDatabaseName (), $ command );
1595
- $ operation ->execute ($ this ->getPrimaryServer ());
1596
- }
1597
-
1598
1595
private function advanceCursorUntilValid (Iterator $ iterator , $ limitOnShardedClusters = 5 )
1599
1596
{
1600
1597
if (! $ this ->isShardedCluster ()) {
0 commit comments