-
Notifications
You must be signed in to change notification settings - Fork 266
PHPLIB-441: Increment ChangeStream key consistently when resuming #626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,11 @@ class ChangeStream implements Iterator | |
private $resumeCallable; | ||
private $csIt; | ||
private $key = 0; | ||
|
||
/** | ||
* Whether the change stream has advanced to its first result. This is used | ||
* to determine whether $key should be incremented after an iteration event. | ||
*/ | ||
private $hasAdvanced = false; | ||
|
||
/** | ||
|
@@ -102,7 +107,7 @@ public function next() | |
{ | ||
try { | ||
$this->csIt->next(); | ||
$this->onIteration(true); | ||
$this->onIteration($this->hasAdvanced); | ||
} catch (RuntimeException $e) { | ||
if ($this->isResumableError($e)) { | ||
$this->resume(); | ||
|
@@ -118,6 +123,9 @@ public function rewind() | |
{ | ||
try { | ||
$this->csIt->rewind(); | ||
/* Unlike next() and resume(), the decision to increment the key | ||
* does not depend on whether the change stream has advanced. This | ||
* ensures that multiple calls to rewind() do not alter state. */ | ||
$this->onIteration(false); | ||
} catch (RuntimeException $e) { | ||
if ($this->isResumableError($e)) { | ||
|
@@ -193,12 +201,12 @@ private function isResumableError(RuntimeException $exception) | |
} | ||
|
||
/** | ||
* Perform housekeeping after an iteration event (i.e. next or rewind). | ||
* Perform housekeeping after an iteration event. | ||
* | ||
* @param boolean $isNext Whether the iteration event was a call to next() | ||
* @param boolean $incrementKey Increment $key if there is a current result | ||
* @throws ResumeTokenException | ||
*/ | ||
private function onIteration($isNext) | ||
private function onIteration($incrementKey) | ||
{ | ||
/* If the cursorId is 0, the server has invalidated the cursor and we | ||
* will never perform another getMore nor need to resume since any | ||
|
@@ -210,13 +218,13 @@ private function onIteration($isNext) | |
$this->resumeCallable = null; | ||
} | ||
|
||
/* Return early if there is not a current result. Avoid any attempt to | ||
* increment the iterator's key or extract a resume token */ | ||
if (!$this->valid()) { | ||
return; | ||
} | ||
|
||
/* Increment the key if the iteration event was a call to next() and we | ||
* have already advanced past the first result. */ | ||
if ($isNext && $this->hasAdvanced) { | ||
if ($incrementKey) { | ||
$this->key++; | ||
} | ||
|
||
|
@@ -234,6 +242,14 @@ private function resume() | |
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken); | ||
$this->csIt = $newChangeStream->csIt; | ||
$this->csIt->rewind(); | ||
$this->onIteration(false); | ||
/* Note: if we are resuming after a call to ChangeStream::rewind(), | ||
* $hasAdvanced will always be false. For it to be true, rewind() would | ||
* need to have thrown a RuntimeException with a resumable error, which | ||
* can only happen during the first call to IteratorIterator::rewind() | ||
* before onIteration() has a chance to set $hasAdvanced to true. | ||
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive | ||
* rewinds) or throw a LogicException (rewind after next), neither of | ||
* which would result in a call to resume(). */ | ||
$this->onIteration($this->hasAdvanced); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd appreciate a sanity check on this. While coming up with the current implementation in this PR, I ran the test suite with a hard assertion of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow this part:
How can There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point here was that "if it were true" cannot happen in practice. Either the
If this was a more general question (apart from We use Another edge case is that it's kosher to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I see, thanks for the explanation! All of this makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah, I forgot that I actually added a doc block to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: this is a slight edge case where |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
use MongoDB\Driver\ReadPreference; | ||
use MongoDB\Driver\Server; | ||
use MongoDB\Driver\Exception\ConnectionTimeoutException; | ||
use MongoDB\Driver\Exception\LogicException; | ||
use MongoDB\Exception\ResumeTokenException; | ||
use MongoDB\Operation\CreateCollection; | ||
use MongoDB\Operation\DatabaseCommand; | ||
|
@@ -225,6 +226,62 @@ private function assertStartAtOperationTime(TimestampInterface $expectedOperatio | |
$this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime); | ||
} | ||
|
||
public function testRewindMultipleTimesWithResults() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two methods aren't related to resuming, so I included them in their own commit. This is the |
||
{ | ||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); | ||
$changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
||
$this->insertDocument(['x' => 1]); | ||
$this->insertDocument(['x' => 2]); | ||
|
||
$changeStream->rewind(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(0, $changeStream->key()); | ||
$this->assertNotNull($changeStream->current()); | ||
|
||
// Subsequent rewind does not change iterator state | ||
$changeStream->rewind(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(0, $changeStream->key()); | ||
$this->assertNotNull($changeStream->current()); | ||
|
||
$changeStream->next(); | ||
|
||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(1, $changeStream->key()); | ||
$this->assertNotNull($changeStream->current()); | ||
|
||
// Rewinding after advancing the iterator is an error | ||
$this->expectException(LogicException::class); | ||
$changeStream->rewind(); | ||
} | ||
|
||
public function testRewindMultipleTimesWithNoResults() | ||
{ | ||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); | ||
$changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
||
$changeStream->rewind(); | ||
$this->assertFalse($changeStream->valid()); | ||
$this->assertNull($changeStream->key()); | ||
$this->assertNull($changeStream->current()); | ||
|
||
// Subsequent rewind does not change iterator state | ||
$changeStream->rewind(); | ||
$this->assertFalse($changeStream->valid()); | ||
$this->assertNull($changeStream->key()); | ||
$this->assertNull($changeStream->current()); | ||
|
||
$changeStream->next(); | ||
$this->assertFalse($changeStream->valid()); | ||
$this->assertNull($changeStream->key()); | ||
$this->assertNull($changeStream->current()); | ||
|
||
// Rewinding after advancing the iterator is an error | ||
$this->expectException(LogicException::class); | ||
$changeStream->rewind(); | ||
} | ||
|
||
public function testRewindResumesAfterConnectionException() | ||
{ | ||
/* In order to trigger a dropped connection, we'll use a new client with | ||
|
@@ -322,20 +379,67 @@ public function testNoChangeAfterResumeBeforeInsert() | |
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
} | ||
|
||
public function testResumeTokenIsUpdatedAfterResuming() | ||
public function testResumeMultipleTimesInSuccession() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method was introduced in #621 for PHPLIB-442 earlier this week, so I didn't feel too bad about renaming it. Refactoring it to also check for key increments (in addition to the existing result assertions) seemed preferable to duplicating the entire test to only check key increments. I also added some additional resume cases, which weren't in the original test. |
||
{ | ||
$this->insertDocument(['_id' => 1]); | ||
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName()); | ||
$operation->execute($this->getPrimaryServer()); | ||
|
||
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions); | ||
$changeStream = $operation->execute($this->getPrimaryServer()); | ||
|
||
/* Killing the cursor when there are no results will test that neither | ||
* the initial rewind() nor its resume attempt incremented the key. */ | ||
$this->killChangeStreamCursor($changeStream); | ||
|
||
$changeStream->rewind(); | ||
$this->assertFalse($changeStream->valid()); | ||
$this->assertNull($changeStream->key()); | ||
$this->assertNull($changeStream->current()); | ||
|
||
$this->insertDocument(['_id' => 1]); | ||
|
||
/* Killing the cursor a second time when there is a result will test | ||
* that the resume attempt picks up the latest change. */ | ||
$this->killChangeStreamCursor($changeStream); | ||
|
||
$changeStream->rewind(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(0, $changeStream->key()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
'operationType' => 'insert', | ||
'fullDocument' => ['_id' => 1], | ||
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
'documentKey' => ['_id' => 1], | ||
]; | ||
|
||
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
|
||
/* Killing the cursor a second time will not trigger a resume until | ||
* ChangeStream::next() is called. A successive call to rewind() should | ||
* not change the iterator's state and preserve the current result. */ | ||
$this->killChangeStreamCursor($changeStream); | ||
|
||
$changeStream->rewind(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(0, $changeStream->key()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
'operationType' => 'insert', | ||
'fullDocument' => ['_id' => 1], | ||
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()], | ||
'documentKey' => ['_id' => 1], | ||
]; | ||
|
||
$this->assertMatchesDocument($expectedResult, $changeStream->current()); | ||
|
||
$this->insertDocument(['_id' => 2]); | ||
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(1, $changeStream->key()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
|
@@ -353,6 +457,7 @@ public function testResumeTokenIsUpdatedAfterResuming() | |
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(2, $changeStream->key()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
|
@@ -374,6 +479,7 @@ public function testResumeTokenIsUpdatedAfterResuming() | |
|
||
$changeStream->next(); | ||
$this->assertTrue($changeStream->valid()); | ||
$this->assertSame(3, $changeStream->key()); | ||
|
||
$expectedResult = [ | ||
'_id' => $changeStream->current()->_id, | ||
|
@@ -689,6 +795,8 @@ public function testNextAdvancesKey() | |
$this->insertDocument(['x' => 1]); | ||
$this->insertDocument(['x' => 2]); | ||
|
||
/* Note: we intentionally do not start iteration with rewind() to ensure | ||
* that next() behaves identically when called without rewind(). */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I happened to read through this test while writing the multiple rewind tests and thought this could use some clarity. This change is also in its own commit, since it doesn't pertain to resuming. |
||
$changeStream->next(); | ||
|
||
$this->assertSame(0, $changeStream->key()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I contemplated renaming this to
$incrementKeyIfValid
, but adding a comment above the early return for!$this->valid()
seemed preferable.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment makes this clear.