Skip to content

PHPLIB-451: ChangeStream::rewind() should never execute getMore #636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/ChangeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use MongoDB\Driver\Exception\ServerException;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\ResumeTokenException;
use IteratorIterator;
use MongoDB\Model\TailableCursorIterator;
use Iterator;

/**
Expand Down Expand Up @@ -61,13 +61,14 @@ class ChangeStream implements Iterator
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param Cursor $cursor
* @param callable $resumeCallable
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, callable $resumeCallable)
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
{
$this->resumeCallable = $resumeCallable;
$this->csIt = new IteratorIterator($cursor);
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
}

/**
Expand Down Expand Up @@ -242,17 +243,11 @@ private function onIteration($incrementKey)
*/
private function resume()
{
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
$this->csIt = $newChangeStream->csIt;
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);

$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
$this->csIt->rewind();
/* Note: if we are resuming after a call to ChangeStream::rewind(),
* $hasAdvanced will always be false. For it to be true, rewind() would
* need to have thrown a RuntimeException with a resumable error, which
* can only happen during the first call to IteratorIterator::rewind()
* before onIteration() has a chance to set $hasAdvanced to true.
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
* rewinds) or throw a LogicException (rewind after next), neither of
* which would result in a call to resume(). */

$this->onIteration($this->hasAdvanced);
}

Expand Down
80 changes: 80 additions & 0 deletions src/Model/TailableCursorIterator.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php
/*
* Copyright 2019 MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace MongoDB\Model;

use MongoDB\Driver\Cursor;
use IteratorIterator;

/**
* Iterator for tailable cursors.
*
* This iterator may be used to wrap a tailable cursor. By indicating whether
* the cursor's first batch of results is empty, this iterator can NOP initial
* calls to rewind() and prevent it from executing a getMore command.
*
* @internal
*/
class TailableCursorIterator extends IteratorIterator
{
private $isRewindNop;

/**
* Constructor.
*
* @internal
* @param Cursor $cursor
* @param boolean $isFirstBatchEmpty
*/
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
{
parent::__construct($cursor);
$this->isRewindNop = $isFirstBatchEmpty;
}

/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function next()
{
try {
parent::next();
} finally {
/* If the cursor ever advances to a valid position, do not prevent
* future attempts to rewind the cursor. This will allow the driver
* to throw a LogicException if the cursor has been advanced past
* its first element. */
if ($this->valid()) {
$this->isRewindNop = false;
}
}
}

/**
* @see https://php.net/iteratoriterator.rewind
* @return void
*/
public function rewind()
{
if ($this->isRewindNop) {
return;
}

parent::rewind();
}
}
46 changes: 28 additions & 18 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
private $changeStreamOptions;
private $collectionName;
private $databaseName;
private $isFirstBatchEmpty = false;
private $operationTime;
private $pipeline;
private $resumeCallable;
Expand Down Expand Up @@ -200,6 +201,11 @@ final public function commandFailed(CommandFailedEvent $event)
/** @internal */
final public function commandStarted(CommandStartedEvent $event)
{
if ($event->getCommandName() !== 'aggregate') {
return;
}

$this->isFirstBatchEmpty = false;
}

/** @internal */
Expand All @@ -211,9 +217,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)

$reply = $event->getReply();

if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
/* Note: the spec only refers to collecting an operation time from the
* "original aggregation", so only capture it if we've not already. */
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
$this->operationTime = $reply->operationTime;
}

if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch);
}
}

/**
Expand All @@ -227,7 +239,9 @@ final public function commandSucceeded(CommandSucceededEvent $event)
*/
public function execute(Server $server)
{
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
$cursor = $this->executeAggregate($server);

return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty);
}

/**
Expand Down Expand Up @@ -255,40 +269,36 @@ private function createResumeCallable(Manager $manager)
unset($this->changeStreamOptions['startAtOperationTime']);
}

// Select a new server using the original read preference
$server = $manager->selectServer($this->aggregateOptions['readPreference']);

/* If we captured an operation time from the first aggregate command
* and there is no "resumeAfter" option, set "startAtOperationTime"
* so that we can resume from the original aggregate's time. */
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}

// Recreate the aggregate command and execute to obtain a new cursor
$this->aggregate = $this->createAggregate();
$cursor = $this->executeAggregate($server);

/* Select a new server using the read preference, execute this
* operation on it, and return the new ChangeStream. */
$server = $manager->selectServer($this->aggregateOptions['readPreference']);

return $this->execute($server);
return [$cursor, $this->isFirstBatchEmpty];
};
}

/**
* Execute the aggregate command and optionally capture its operation time.
* Execute the aggregate command.
*
* The command will be executed using APM so that we can capture its
* operation time and/or firstBatch size.
*
* @param Server $server
* @return Cursor
*/
private function executeAggregate(Server $server)
{
/* If we've already captured an operation time or the server does not
* support resuming from an operation time (e.g. MongoDB 3.6), execute
* the aggregation directly and return its cursor. */
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic was actually a bit backwards, but it worked in practice.

The server has reported operation times for as long as change streams have been supported (since 3.6); however, the startAtOperationTime option has only been supported since 4.0. Previously, createResumeCallable would decide to use startAtOperationTime purely by whether $this->operationTime was set, as we would only ever set that if this condition was true and we ran the aggregate command through APM.

With this PR, we need to always check whether aggregate's firstBatch is empty and therefore always run through APM. That means that $this->operationTime was always captured and could result in the driver attempting to use startAtOperationTime to resume against a 3.6 server (in the absence of a resume token).

To address this, I moved the server_supports_feature check to createResumeCallable before we decide to use that option. That fixes the issue with resuming against a 3.6 server when no changes have been returned. This also means that the APM callback can continue collecting $this->operationTime all the time. Alternatively, I suppose we could have added a server_supports_feature within the APM callback and decide whether to ignore the operation time, but createResumeCallable seems more straightforward since that's when we actually attempt to specify the option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thanks!

return $this->aggregate->execute($server);
}

/* Otherwise, execute the aggregation using command monitoring so that
* we can capture its operation time with commandSucceeded(). */
\MongoDB\Driver\Monitoring\addSubscriber($this);

try {
Expand Down
85 changes: 85 additions & 0 deletions tests/Model/TailableCursorIteratorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace MongoDB\Tests\Model;

use MongoDB\Collection;
use MongoDB\Driver\Exception\LogicException;
use MongoDB\Model\TailableCursorIterator;
use MongoDB\Operation\Find;
use MongoDB\Operation\CreateCollection;
use MongoDB\Operation\DropCollection;
use MongoDB\Tests\CommandObserver;
use MongoDB\Tests\FunctionalTestCase;

class TailableCursorIteratorTest extends FunctionalTestCase
{
private $collection;

public function setUp()
{
parent::setUp();

$operation = new DropCollection($this->getDatabaseName(), $this->getCollectionName());
$operation->execute($this->getPrimaryServer());

$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName(), ['capped' => true, 'size' => 8192]);
$operation->execute($this->getPrimaryServer());

$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
}

public function testFirstBatchIsEmpty()
{
$this->collection->insertOne(['x' => 1]);

$cursor = $this->collection->find(['x' => ['$gt' => 1]], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, true);

$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertFalse($iterator->valid());

$this->collection->insertOne(['x' => 2]);

$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current());

$this->expectException(LogicException::class);
$iterator->rewind();
}

public function testFirstBatchIsNotEmpty()
{
$this->collection->insertOne(['x' => 1]);

$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
$iterator = new TailableCursorIterator($cursor, false);

$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 1], $iterator->current());

$this->collection->insertOne(['x' => 2]);

$iterator->next();
$this->assertTrue($iterator->valid());
$this->assertMatchesDocument(['x' => 2], $iterator->current());

$this->expectException(LogicException::class);
$iterator->rewind();
}

private function assertNoCommandExecuted(callable $callable)
{
$commands = [];

(new CommandObserver)->observe(
$callable,
function(array $event) use (&$commands) {
$this->fail(sprintf('"%s" command was executed', $event['started']->getCommandName()));
}
);

$this->assertEmpty($commands);
}
}
Loading