-
Notifications
You must be signed in to change notification settings - Fork 266
PHPLIB-442: Ensure change stream resume token is updated after resume #621
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,22 +102,7 @@ public function next() | |
{ | ||
try { | ||
$this->csIt->next(); | ||
if ($this->valid()) { | ||
if ($this->hasAdvanced) { | ||
$this->key++; | ||
} | ||
$this->hasAdvanced = true; | ||
$this->resumeToken = $this->extractResumeToken($this->csIt->current()); | ||
} | ||
/* If the cursorId is 0, the server has invalidated the cursor so we | ||
* will never perform another getMore. This means that we cannot | ||
* resume and we can therefore unset the resumeCallable, which will | ||
* free any reference to Watch. This will also free the only | ||
* reference to an implicit session, since any such reference | ||
* belongs to Watch. */ | ||
if ((string) $this->getCursorId() === '0') { | ||
$this->resumeCallable = null; | ||
} | ||
$this->onIteration(true); | ||
} catch (RuntimeException $e) { | ||
if ($this->isResumableError($e)) { | ||
$this->resume(); | ||
|
@@ -133,14 +118,7 @@ public function rewind() | |
{ | ||
try { | ||
$this->csIt->rewind(); | ||
if ($this->valid()) { | ||
$this->hasAdvanced = true; | ||
$this->resumeToken = $this->extractResumeToken($this->csIt->current()); | ||
} | ||
// As with next(), free the callable once we know it will never be used. | ||
if ((string) $this->getCursorId() === '0') { | ||
$this->resumeCallable = null; | ||
} | ||
$this->onIteration(false); | ||
} catch (RuntimeException $e) { | ||
if ($this->isResumableError($e)) { | ||
$this->resume(); | ||
|
@@ -160,7 +138,7 @@ public function valid() | |
/** | ||
* Extracts the resume token (i.e. "_id" field) from the change document. | ||
* | ||
* @param array|document $document Change document | ||
* @param array|object $document Change document | ||
* @return mixed | ||
* @throws InvalidArgumentException | ||
* @throws ResumeTokenException if the resume token is not found or invalid | ||
|
@@ -214,6 +192,38 @@ private function isResumableError(RuntimeException $exception) | |
return true; | ||
} | ||
|
||
/** | ||
* Perform housekeeping after an iteration event (i.e. next or rewind). | ||
* | ||
* @param boolean $isNext Whether the iteration event was a call to next() | ||
* @throws ResumeTokenException | ||
*/ | ||
private function onIteration($isNext) | ||
{ | ||
/* If the cursorId is 0, the server has invalidated the cursor and we | ||
* will never perform another getMore nor need to resume since any | ||
* remaining results (up to and including the invalidate event) will | ||
* have been received in the last response. Therefore, we can unset the | ||
* resumeCallable. This will free any reference to Watch as well as the | ||
* only reference to any implicit session created therein. */ | ||
if ((string) $this->getCursorId() === '0') { | ||
$this->resumeCallable = null; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic previously happened after the steps below, but I believe the old ordering was incorrect. Consider:
This is basic cleanup that we'd always like to happen, so it now happens first. |
||
|
||
if (!$this->valid()) { | ||
return; | ||
} | ||
|
||
/* Increment the key if the iteration event was a call to next() and we | ||
* have already advanced past the first result. */ | ||
if ($isNext && $this->hasAdvanced) { | ||
$this->key++; | ||
} | ||
|
||
$this->hasAdvanced = true; | ||
$this->resumeToken = $this->extractResumeToken($this->csIt->current()); | ||
} | ||
|
||
/** | ||
* Creates a new changeStream after a resumable server error. | ||
* | ||
|
@@ -224,5 +234,6 @@ private function resume() | |
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken); | ||
$this->csIt = $newChangeStream->csIt; | ||
$this->csIt->rewind(); | ||
$this->onIteration(false); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,6 +322,70 @@ public function testNoChangeAfterResumeBeforeInsert() | |
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
} | ||
|
||
public function testResumeTokenIsUpdatedAfterResuming() | ||
{ | ||
$this->insertDocument(['_id' => 1]); | ||
|
||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); | ||
$changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
||
$changeStream->rewind(); | ||
$this->assertNull($changeStream->current()); | ||
|
||
$this->insertDocument(['_id' => 2]); | ||
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
'operationType' => 'insert', | ||
'fullDocument' => ['_id' => 2], | ||
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
'documentKey' => ['_id' => 2], | ||
]; | ||
|
||
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
|
||
$this->killChangeStreamCursor($changeStream); | ||
|
||
$this->insertDocument(['_id' => 3]); | ||
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
'operationType' => 'insert', | ||
'fullDocument' => ['_id' => 3], | ||
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
'documentKey' => ['_id' => 3], | ||
]; | ||
|
||
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
|
||
/* Triggering a consecutive failure will allow us to test whether the | ||
* resume token was properly updated after the last resume. If the | ||
* resume token updated, the next result will be {_id: 4}; otherwise, | ||
* we'll see {_id: 3} returned again. */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is basically a manual prose version of the spec test for PHPLIB-416 and DRIVERS-620, which is currently skipped in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to check, this failed before the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. If I remove the call to |
||
$this->killChangeStreamCursor($changeStream); | ||
|
||
$this->insertDocument(['_id' => 4]); | ||
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
'operationType' => 'insert', | ||
'fullDocument' => ['_id' => 4], | ||
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
'documentKey' => ['_id' => 4], | ||
]; | ||
|
||
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
} | ||
|
||
public function testKey() | ||
{ | ||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment was also revised a bit to better explain why we do this. I shared some thoughts on this in Slack and @p-mongo chimed in with a sanity check.