Skip to content

PHPLIB-342: Change streams should use the same session when resuming #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public function next()
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
/* If the cursorId is 0, the server has invalidated the cursor so we
* will never perform another getMore. This means that we cannot
* resume and we can therefore unset the resumeCallable, which will
* free any reference to Watch. This will also free the only
* reference to an implicit session, since any such reference
* belongs to Watch. */
if ((string)$this->getCursorId() === '0') {
$this->resumeCallable = null;
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
$resumable = true;
Expand Down Expand Up @@ -130,6 +139,10 @@ public function rewind()
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
// As with next(), free the callable once we know it will never be used.
if ((string)$this->getCursorId() === '0') {
$this->resumeCallable = null;
}
} catch (RuntimeException $e) {
if (strpos($e->getMessage(), "not master") !== false) {
$resumable = true;
Expand Down
6 changes: 6 additions & 0 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
}
}

if ( ! isset($options['session'])) {
try {
$options['session'] = $manager->startSession();
} catch (DriverRuntimeException $e) {}
}

$this->databaseName = (string) $databaseName;
$this->collectionName = (string) $collectionName;
$this->pipeline = $pipeline;
Expand Down
86 changes: 86 additions & 0 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DatabaseCommand;
use MongoDB\Operation\DropCollection;
use MongoDB\Operation\InsertOne;
use MongoDB\Operation\Watch;
use MongoDB\Tests\CommandObserver;
Expand Down Expand Up @@ -588,6 +590,90 @@ public function testResumeTokenNotFoundAdvancesKey()
$this->assertSame(2, $changeStream->key());
}

public function testSessionPersistsAfterResume()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

$changeStream = null;
$originalSession = null;
$sessionAfterResume = [];
$commands = [];

/* We want to ensure that the lsid of the initial aggregate matches the
* lsid of any aggregates after the change stream resumes. After
* PHPC-1152 is complete, we will ensure that the lsid of the initial
* aggregate matches the lsid of any subsequent aggregates and getMores.
*/
(new CommandObserver)->observe(
function() use ($operation, &$changeStream) {
$changeStream = $operation->execute($this->getPrimaryServer());
},
function($changeStream) use (&$originalSession) {
if (isset($changeStream->aggregate)) {
$originalSession = bin2hex((string) $changeStream->lsid->id);
}
}
);

$changeStream->rewind();
$this->killChangeStreamCursor($changeStream);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test this against a replica set with auth if possible. That should highlight whether killChangeStreamCursor() will need to also take the same implicit session created within the Watch class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After testing with auth, it looks like killChangeStreamCursor() does not need to take the same implicit session created within the Watch class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent. That answers my question in mongodb/mongo-php-driver#803 (comment).


(new CommandObserver)->observe(
function() use (&$changeStream) {
$changeStream->next();
},
function ($changeStream) use (&$sessionAfterResume, &$commands) {
$commands[] = key((array) $changeStream);
$sessionAfterResume[] = bin2hex((string) $changeStream->lsid->id);
}
);

$expectedCommands = [
/* We expect a getMore to be issued because we are calling next(). */
'getMore',
/* Since we have killed the cursor, ChangeStream will resume by
* issuing a new aggregate commmand. */
'aggregate',
/* When ChangeStream resumes, it overwrites its original cursor with
* the new cursor resulting from the last aggregate command. This
* removes the last reference to the old cursor, which causes the
* driver to kill it (via mongoc_cursor_destroy()). */
'killCursors',
/* Finally, ChangeStream will rewind the new cursor as the last step
* of the resume process. This results in one last getMore. */
'getMore',
];

$this->assertSame($expectedCommands, $commands);

foreach ($sessionAfterResume as $session) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice :)

$this->assertEquals($session, $originalSession);
}
}

public function testSessionFreed()
{
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
$operation->execute($this->getPrimaryServer());

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$rc = new ReflectionClass($changeStream);
$rp = $rc->getProperty('resumeCallable');
$rp->setAccessible(true);

$this->assertNotNull($rp->getValue($changeStream));

// Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
$operation = new DropCollection($this->getDatabaseName(), $this->getCollectionName());
$operation->execute($this->getPrimaryServer());

$changeStream->next();

$this->assertNull($rp->getValue($changeStream));
}

private function insertDocument($document)
{
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);
Expand Down