Skip to content

PHPLIB-442: Ensure change stream resume token is updated after resume #621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 36 additions & 25 deletions src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,7 @@ public function next()
{
try {
$this->csIt->next();
if ($this->valid()) {
if ($this->hasAdvanced) {
$this->key++;
}
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
/* If the cursorId is 0, the server has invalidated the cursor so we
* will never perform another getMore. This means that we cannot
* resume and we can therefore unset the resumeCallable, which will
* free any reference to Watch. This will also free the only
* reference to an implicit session, since any such reference
* belongs to Watch. */
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
$this->onIteration(true);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
$this->resume();
Expand All @@ -133,14 +118,7 @@ public function rewind()
{
try {
$this->csIt->rewind();
if ($this->valid()) {
$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}
// As with next(), free the callable once we know it will never be used.
if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
$this->onIteration(false);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
$this->resume();
Expand All @@ -160,7 +138,7 @@ public function valid()
/**
* Extracts the resume token (i.e. "_id" field) from the change document.
*
* @param array|document $document Change document
* @param array|object $document Change document
* @return mixed
* @throws InvalidArgumentException
* @throws ResumeTokenException if the resume token is not found or invalid
Expand Down Expand Up @@ -214,6 +192,38 @@ private function isResumableError(RuntimeException $exception)
return true;
}

/**
* Perform housekeeping after an iteration event (i.e. next or rewind).
*
* @param boolean $isNext Whether the iteration event was a call to next()
* @throws ResumeTokenException
*/
private function onIteration($isNext)
{
/* If the cursorId is 0, the server has invalidated the cursor and we
* will never perform another getMore nor need to resume since any
* remaining results (up to and including the invalidate event) will
* have been received in the last response. Therefore, we can unset the
* resumeCallable. This will free any reference to Watch as well as the
* only reference to any implicit session created therein. */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was also revised a bit to better explain why we do this. I shared some thoughts on this in Slack and @p-mongo chimed in with a sanity check.

if ((string) $this->getCursorId() === '0') {
$this->resumeCallable = null;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic previously happened after the steps below, but I believe the old ordering was incorrect. Consider:

  • If the iterator was not valid after calling next()/rewind(), we skipped the condition to set $hasAdvanced, optionally increment $key, and call extractResumeToken(). This cleanup check would always happen.
  • If the iterator was valid after calling next()/rewind(), we might potentially never reach this point if extractResumeToken() were to throw an exception.

This is basic cleanup that we'd always like to happen, so it now happens first.


if (!$this->valid()) {
return;
}

/* Increment the key if the iteration event was a call to next() and we
* have already advanced past the first result. */
if ($isNext && $this->hasAdvanced) {
$this->key++;
}

$this->hasAdvanced = true;
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
}

/**
* Creates a new changeStream after a resumable server error.
*
Expand All @@ -224,5 +234,6 @@ private function resume()
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt;
$this->csIt->rewind();
$this->onIteration(false);
}
}
64 changes: 64 additions & 0 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,70 @@ public function testNoChangeAfterResumeBeforeInsert()
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testResumeTokenIsUpdatedAfterResuming()
{
$this->insertDocument(['_id' => 1]);

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertNull($changeStream->current());

$this->insertDocument(['_id' => 2]);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 2],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 2],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());

$this->killChangeStreamCursor($changeStream);

$this->insertDocument(['_id' => 3]);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 3],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 3],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());

/* Triggering a consecutive failure will allow us to test whether the
* resume token was properly updated after the last resume. If the
* resume token updated, the next result will be {_id: 4}; otherwise,
* we'll see {_id: 3} returned again. */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically a manual prose version of the spec test for PHPLIB-416 and DRIVERS-620, which is currently skipped in the master branch (see: here).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check, this failed before the onIteration() change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. If I remove the call to onIteration(false) from resume(), testResumeTokenIsUpdatedAfterResuming fails and we encounter the error with duplicate events being returned as described in PHPLIB-442.

$this->killChangeStreamCursor($changeStream);

$this->insertDocument(['_id' => 4]);

$changeStream->next();
$this->assertTrue($changeStream->valid());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 4],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 4],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testKey()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
Expand Down