Skip to content

Commit 125c6d9

Browse files
committed
Test expectedCommands to ensure killChangeStreamCursor doesn't need to take implicit session
1 parent 2ed0241 commit 125c6d9

File tree

1 file changed

+38
-10
lines changed

1 file changed

+38
-10
lines changed

tests/Operation/WatchFunctionalTest.php

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -594,12 +594,14 @@ public function testSessionPersistsAfterResume()
594594

595595
$changeStream = null;
596596
$originalSession = null;
597-
$sessionAfterResume = null;
597+
$sessionAfterResume = [];
598+
$commands = [];
598599

599-
// We want to ensure that the lsid of the initial aggregate matches the
600-
// lsid of any aggregates after the change stream resumes. After
601-
// PHPC-1152 is complete, we will ensure that the lsid of the initial
602-
// aggregate matches the lsid of any subsequent aggregates and getMores.
600+
/* We want to ensure that the lsid of the initial aggregate matches the
601+
* lsid of any aggregates after the change stream resumes. After
602+
* PHPC-1152 is complete, we will ensure that the lsid of the initial
603+
* aggregate matches the lsid of any subsequent aggregates and getMores.
604+
*/
603605
(new CommandObserver)->observe(
604606
function() use ($operation, &$changeStream) {
605607
$changeStream = $operation->execute($this->getPrimaryServer());
@@ -618,14 +620,40 @@ function($changeStream) use (&$originalSession) {
618620
function() use (&$changeStream) {
619621
$changeStream->next();
620622
},
621-
function ($changeStream) use (&$sessionAfterResume) {
622-
if (isset($changeStream->aggregate)) {
623-
$sessionAfterResume = bin2hex((string) $changeStream->lsid->id);
624-
}
623+
function ($changeStream) use (&$sessionAfterResume, &$commands) {
624+
$commands[] = key((array) $changeStream);
625+
$sessionAfterResume[] = bin2hex((string) $changeStream->lsid->id);
625626
}
626627
);
627628

628-
$this->assertEquals($sessionAfterResume, $originalSession);
629+
$expectedCommands = [
630+
/* The initial aggregate command for change streams returns a cursor
631+
* envelope with an empty initial batch, since there are no changes
632+
* to report at the moment the change stream is created. Therefore,
633+
* we expect a getMore to be issued when we first advance the change
634+
* stream (with either rewind() or next()). */
635+
'getMore',
636+
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
637+
* getMore command encounters a client socket timeout and leaves the
638+
* cursor open on the server. ChangeStream should catch this error
639+
* and resume by issuing a new aggregate command. */
640+
'aggregate',
641+
/* When ChangeStream resumes, it overwrites its original cursor with
642+
* the new cursor resulting from the last aggregate command. This
643+
* removes the last reference to the old cursor, which causes the
644+
* driver to kill it (via mongoc_cursor_destroy()). */
645+
'killCursors',
646+
/* Finally, ChangeStream will rewind the new cursor as the last step
647+
* of the resume process. This results in one last getMore. */
648+
'getMore',
649+
];
650+
651+
$this->assertSame($expectedCommands, $commands);
652+
653+
foreach ($sessionAfterResume as $session) {
654+
$this->assertEquals($session, $originalSession);
655+
}
656+
629657
}
630658

631659
private function insertDocument($document)

0 commit comments

Comments
 (0)