Skip to content

PHPLIB-441: Increment ChangeStream key consistently when resuming #626

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class ChangeStream implements Iterator
private $resumeCallable;
private $csIt;
private $key = 0;

/**
* Whether the change stream has advanced to its first result. This is used
* to determine whether $key should be incremented after an iteration event.
*/
private $hasAdvanced = false;

/**
Expand Down Expand Up @@ -102,7 +107,7 @@ public function next()
{
try {
$this->csIt->next();
$this->onIteration(true);
$this->onIteration($this->hasAdvanced);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
$this->resume();
Expand All @@ -118,6 +123,9 @@ public function rewind()
{
try {
$this->csIt->rewind();
/* Unlike next() and resume(), the decision to increment the key
* does not depend on whether the change stream has advanced. This
* ensures that multiple calls to rewind() do not alter state. */
$this->onIteration(false);
} catch (RuntimeException $e) {
if ($this->isResumableError($e)) {
Expand Down Expand Up @@ -193,12 +201,12 @@ private function isResumableError(RuntimeException $exception)
}

/**
* Perform housekeeping after an iteration event (i.e. next or rewind).
* Perform housekeeping after an iteration event.
*
* @param boolean $isNext Whether the iteration event was a call to next()
* @param boolean $incrementKey Increment $key if there is a current result
* @throws ResumeTokenException
*/
private function onIteration($isNext)
private function onIteration($incrementKey)
{
/* If the cursorId is 0, the server has invalidated the cursor and we
* will never perform another getMore nor need to resume since any
Expand All @@ -210,13 +218,13 @@ private function onIteration($isNext)
$this->resumeCallable = null;
}

/* Return early if there is not a current result. Avoid any attempt to
* increment the iterator's key or extract a resume token */
if (!$this->valid()) {
return;
}

/* Increment the key if the iteration event was a call to next() and we
* have already advanced past the first result. */
if ($isNext && $this->hasAdvanced) {
if ($incrementKey) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I contemplated renaming this to $incrementKeyIfValid, but adding a comment above the early return for !$this->valid() seemed preferable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment makes this clear.

$this->key++;
}

Expand All @@ -234,6 +242,14 @@ private function resume()
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt;
$this->csIt->rewind();
$this->onIteration(false);
/* Note: if we are resuming after a call to ChangeStream::rewind(),
* $hasAdvanced will always be false. For it to be true, rewind() would
* need to have thrown a RuntimeException with a resumable error, which
* can only happen during the first call to IteratorIterator::rewind()
* before onIteration() has a chance to set $hasAdvanced to true.
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
* rewinds) or throw a LogicException (rewind after next), neither of
* which would result in a call to resume(). */
$this->onIteration($this->hasAdvanced);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate a sanity check on this. While coming up with the current implementation in this PR, I ran the test suite with a hard assertion of $this->hasAdvanced === false placed before the call to resume() in rewind()'s catch block, and it was never triggered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow this part:

If it were true, then the previous call to IteratorIterator::rewind() would either have been
a NOP and succeeded because the cursor was still at its first position

How can $hasAdvanced be true if the cursor was still at its first position?

Copy link
Member Author

@jmikola jmikola Jun 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point here was that "if it were true" cannot happen in practice. Either the rewind() succeeds as a NOP and doesn't throw, so we never call resume() in the first place, or it throws a LogicException, which we don't catch (and also don't call resume()). I'll reword this a bit to better explain that, as I feel I only conveyed that the LogicException case is when we never reach this point.

How can $hasAdvanced be true if the cursor was still at its first position?

If this was a more general question (apart from resume() being invoked), I can address that. When a cursor is first created (basic or tailable alike), it starts before the first element. valid() is false and there is no current element. The initial call to rewind() or next() is responsible for advancing to the first position (if any), updating valid(), and optionally populating the current element.

We use $hasAdvanced here to track if we've advanced before the initial pre-iteration state. If it's more helpful, I suppose we can rename this to $hasIterationStarted. Let me know if you think that'd make more sense.

Another edge case is that it's kosher to call rewind() multiple times in sequence. And since $key is initialized to 0, we should never increment the key after a successful call to rewind(), since, unlike next() or resuming, rewind() itself never moves past the first element.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, thanks for the explanation! All of this makes sense.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we can rename this to $hasIterationStarted.

Ah, I forgot that I actually added a doc block to $hasAdvanced to explain exactly what it implies (here). I think that's sufficient, as $hasIterated might also sound confusing since calls to rewind() and next() can still leave it false if the change stream doesn't come across a change event. I'll stick with the shorter name and just revise this comment block to be more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is a slight edge case where rewind() could be called immediately after resuming without encountering a LogicException from the driver. Since resuming creates a fresh cursor and rewinds it, a user could technically call ChangeStream::rewind() after a resume and not see an error. This is very trivial and may not be worth fixing, but I opened PHPLIB-448 to track it.

}
}
112 changes: 110 additions & 2 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Driver\Exception\ConnectionTimeoutException;
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Exception\ResumeTokenException;
use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DatabaseCommand;
Expand Down Expand Up @@ -225,6 +226,62 @@ private function assertStartAtOperationTime(TimestampInterface $expectedOperatio
$this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime);
}

public function testRewindMultipleTimesWithResults()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two methods aren't related to resuming, so I included them in their own commit. This is the rewind() counterpart to the existing testNextAdvancesKey test method.

{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$this->assertNotNull($changeStream->current());

// Subsequent rewind does not change iterator state
$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());
$this->assertNotNull($changeStream->current());

$changeStream->next();

$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key());
$this->assertNotNull($changeStream->current());

// Rewinding after advancing the iterator is an error
$this->expectException(LogicException::class);
$changeStream->rewind();
}

public function testRewindMultipleTimesWithNoResults()
{
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());

// Subsequent rewind does not change iterator state
$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());

$changeStream->next();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());

// Rewinding after advancing the iterator is an error
$this->expectException(LogicException::class);
$changeStream->rewind();
}

public function testRewindResumesAfterConnectionException()
{
/* In order to trigger a dropped connection, we'll use a new client with
Expand Down Expand Up @@ -322,20 +379,67 @@ public function testNoChangeAfterResumeBeforeInsert()
$this->assertMatchesDocument($expectedResult, $changeStream->current());
}

public function testResumeTokenIsUpdatedAfterResuming()
public function testResumeMultipleTimesInSuccession()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was introduced in #621 for PHPLIB-442 earlier this week, so I didn't feel too bad about renaming it. Refactoring it to also check for key increments (in addition to the existing result assertions) seemed preferable to duplicating the entire test to only check key increments.

I also added some additional resume cases, which weren't in the original test.

{
$this->insertDocument(['_id' => 1]);
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
$operation->execute($this->getPrimaryServer());

$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
$changeStream = $operation->execute($this->getPrimaryServer());

/* Killing the cursor when there are no results will test that neither
* the initial rewind() nor its resume attempt incremented the key. */
$this->killChangeStreamCursor($changeStream);

$changeStream->rewind();
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
$this->assertNull($changeStream->current());

$this->insertDocument(['_id' => 1]);

/* Killing the cursor a second time when there is a result will test
* that the resume attempt picks up the latest change. */
$this->killChangeStreamCursor($changeStream);

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());

/* Killing the cursor a second time will not trigger a resume until
* ChangeStream::next() is called. A successive call to rewind() should
* not change the iterator's state and preserve the current result. */
$this->killChangeStreamCursor($changeStream);

$changeStream->rewind();
$this->assertTrue($changeStream->valid());
$this->assertSame(0, $changeStream->key());

$expectedResult = [
'_id' => $changeStream->current()->_id,
'operationType' => 'insert',
'fullDocument' => ['_id' => 1],
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
'documentKey' => ['_id' => 1],
];

$this->assertMatchesDocument($expectedResult, $changeStream->current());

$this->insertDocument(['_id' => 2]);

$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(1, $changeStream->key());

$expectedResult = [
'_id' => $changeStream->current()->_id,
Expand All @@ -353,6 +457,7 @@ public function testResumeTokenIsUpdatedAfterResuming()

$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(2, $changeStream->key());

$expectedResult = [
'_id' => $changeStream->current()->_id,
Expand All @@ -374,6 +479,7 @@ public function testResumeTokenIsUpdatedAfterResuming()

$changeStream->next();
$this->assertTrue($changeStream->valid());
$this->assertSame(3, $changeStream->key());

$expectedResult = [
'_id' => $changeStream->current()->_id,
Expand Down Expand Up @@ -689,6 +795,8 @@ public function testNextAdvancesKey()
$this->insertDocument(['x' => 1]);
$this->insertDocument(['x' => 2]);

/* Note: we intentionally do not start iteration with rewind() to ensure
* that next() behaves identically when called without rewind(). */
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to read through this test while writing the multiple rewind tests and thought this could use some clarity. This change is also in its own commit, since it doesn't pertain to resuming.

$changeStream->next();

$this->assertSame(0, $changeStream->key());
Expand Down