Skip to content

PHPLIB-456: Always invalidate ChangeStream position if iteration fails #674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions src/Model/ChangeStreamIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
/** @var boolean */
private $isRewindNop;

/** @var boolean */
private $isValid = false;

/** @var object|null */
private $postBatchResumeToken;

Expand Down Expand Up @@ -126,6 +129,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
}
}

/**
* @see https://php.net/iteratoriterator.current
* @return mixed
*/
public function current()
{
return $this->isValid ? parent::current() : null;
}

/**
* Returns the resume token for the iterator's current position.
*
Expand All @@ -140,6 +152,15 @@ public function getResumeToken()
return $this->resumeToken;
}

/**
* @see https://php.net/iteratoriterator.key
* @return mixed
*/
public function key()
{
return $this->isValid ? parent::key() : null;
}

/**
* @see https://php.net/iteratoriterator.rewind
* @return void
Expand Down Expand Up @@ -181,6 +202,15 @@ public function rewind()
$this->onIteration(false);
}

/**
* @see https://php.net/iteratoriterator.valid
* @return boolean
*/
public function valid()
{
return $this->isValid;
}

/**
* Extracts the resume token (i.e. "_id" field) from a change document.
*
Expand All @@ -204,10 +234,12 @@ private function extractResumeToken($document)
: (isset($document->_id) ? $document->_id : null);

if (! isset($resumeToken)) {
$this->isValid = false;
throw ResumeTokenException::notFound();
}

if (! is_array($resumeToken) && ! is_object($resumeToken)) {
$this->isValid = false;
throw ResumeTokenException::invalidType($resumeToken);
}

Expand All @@ -232,16 +264,16 @@ private function isAtEndOfBatch()
*/
private function onIteration($incrementBatchPosition)
{
$isValid = $this->valid();
$this->isValid = parent::valid();

/* Disable rewind()'s NOP behavior once we advance to a valid position.
* This will allow the driver to throw a LogicException if rewind() is
* called after the cursor has advanced past its first element. */
if ($this->isRewindNop && $isValid) {
if ($this->isRewindNop && $this->isValid) {
$this->isRewindNop = false;
}

if ($incrementBatchPosition && $isValid) {
if ($incrementBatchPosition && $this->isValid) {
$this->batchPosition++;
}

Expand All @@ -253,7 +285,7 @@ private function onIteration($incrementBatchPosition)
* from the current document if possible. */
if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
$this->resumeToken = $this->postBatchResumeToken;
} elseif ($isValid) {
} elseif ($this->isValid) {
$this->resumeToken = $this->extractResumeToken($this->current());
}
}
Expand Down
23 changes: 6 additions & 17 deletions tests/Operation/WatchFunctionalTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -1125,34 +1125,23 @@ public function testResumeTokenNotFoundDoesNotAdvanceKey()
$changeStream->next();
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest leaving a comment in the body of each catch block or a combined comment above try that reminds us that the library will throw a client-side error for server < 4.1.8 and we can rely on a server-side error for >= 4.1.8.

/* If a client-side error is thrown (server < 4.1.8), the tailable
* cursor's position is still valid. This may change once PHPLIB-456
* is implemented. */
$expectedValid = true;
$expectedKey = 0;
/* On server versions < 4.1.8, a client-side error is thrown. */
} catch (ServerException $e) {
/* If a server-side error is thrown (server >= 4.1.8), the tailable
* cursor's position is not valid. */
$expectedValid = false;
$expectedKey = null;
/* On server versions >= 4.1.8, the error is thrown server-side. */
}

$this->assertSame($expectedValid, $changeStream->valid());
$this->assertSame($expectedKey, $changeStream->key());
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());

try {
$changeStream->next();
$this->fail('Exception for missing resume token was not thrown');
} catch (ResumeTokenException $e) {
$expectedValid = true;
$expectedKey = 0;
} catch (ServerException $e) {
$expectedValid = false;
$expectedKey = null;
}

$this->assertSame($expectedValid, $changeStream->valid());
$this->assertSame($expectedKey, $changeStream->key());
$this->assertFalse($changeStream->valid());
$this->assertNull($changeStream->key());
}

public function testSessionPersistsAfterResume()
Expand Down