Skip to content

Commit 8c7a92c

Browse files
committed
Merge pull request #636
2 parents 80cee6f + 84c365c commit 8c7a92c

File tree

8 files changed

+364
-269
lines changed

8 files changed

+364
-269
lines changed

src/ChangeStream.php

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use MongoDB\Driver\Exception\ServerException;
2525
use MongoDB\Exception\InvalidArgumentException;
2626
use MongoDB\Exception\ResumeTokenException;
27-
use IteratorIterator;
27+
use MongoDB\Model\TailableCursorIterator;
2828
use Iterator;
2929

3030
/**
@@ -61,13 +61,14 @@ class ChangeStream implements Iterator
6161
* Constructor.
6262
*
6363
* @internal
64-
* @param Cursor $cursor
64+
* @param Cursor $cursor
6565
* @param callable $resumeCallable
66+
* @param boolean $isFirstBatchEmpty
6667
*/
67-
public function __construct(Cursor $cursor, callable $resumeCallable)
68+
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchEmpty)
6869
{
6970
$this->resumeCallable = $resumeCallable;
70-
$this->csIt = new IteratorIterator($cursor);
71+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
7172
}
7273

7374
/**
@@ -242,17 +243,11 @@ private function onIteration($incrementKey)
242243
*/
243244
private function resume()
244245
{
245-
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
246-
$this->csIt = $newChangeStream->csIt;
246+
list($cursor, $isFirstBatchEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
247+
248+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchEmpty);
247249
$this->csIt->rewind();
248-
/* Note: if we are resuming after a call to ChangeStream::rewind(),
249-
* $hasAdvanced will always be false. For it to be true, rewind() would
250-
* need to have thrown a RuntimeException with a resumable error, which
251-
* can only happen during the first call to IteratorIterator::rewind()
252-
* before onIteration() has a chance to set $hasAdvanced to true.
253-
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
254-
* rewinds) or throw a LogicException (rewind after next), neither of
255-
* which would result in a call to resume(). */
250+
256251
$this->onIteration($this->hasAdvanced);
257252
}
258253

src/Model/TailableCursorIterator.php

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
/*
3+
* Copyright 2019 MongoDB, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
namespace MongoDB\Model;
19+
20+
use MongoDB\Driver\Cursor;
21+
use IteratorIterator;
22+
23+
/**
24+
* Iterator for tailable cursors.
25+
*
26+
* This iterator may be used to wrap a tailable cursor. By indicating whether
27+
* the cursor's first batch of results is empty, this iterator can NOP initial
28+
* calls to rewind() and prevent it from executing a getMore command.
29+
*
30+
* @internal
31+
*/
32+
class TailableCursorIterator extends IteratorIterator
33+
{
34+
private $isRewindNop;
35+
36+
/**
37+
* Constructor.
38+
*
39+
* @internal
40+
* @param Cursor $cursor
41+
* @param boolean $isFirstBatchEmpty
42+
*/
43+
public function __construct(Cursor $cursor, $isFirstBatchEmpty)
44+
{
45+
parent::__construct($cursor);
46+
$this->isRewindNop = $isFirstBatchEmpty;
47+
}
48+
49+
/**
50+
* @see https://php.net/iteratoriterator.rewind
51+
* @return void
52+
*/
53+
public function next()
54+
{
55+
try {
56+
parent::next();
57+
} finally {
58+
/* If the cursor ever advances to a valid position, do not prevent
59+
* future attempts to rewind the cursor. This will allow the driver
60+
* to throw a LogicException if the cursor has been advanced past
61+
* its first element. */
62+
if ($this->valid()) {
63+
$this->isRewindNop = false;
64+
}
65+
}
66+
}
67+
68+
/**
69+
* @see https://php.net/iteratoriterator.rewind
70+
* @return void
71+
*/
72+
public function rewind()
73+
{
74+
if ($this->isRewindNop) {
75+
return;
76+
}
77+
78+
parent::rewind();
79+
}
80+
}

src/Operation/Watch.php

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
5757
private $changeStreamOptions;
5858
private $collectionName;
5959
private $databaseName;
60+
private $isFirstBatchEmpty = false;
6061
private $operationTime;
6162
private $pipeline;
6263
private $resumeCallable;
@@ -200,6 +201,11 @@ final public function commandFailed(CommandFailedEvent $event)
200201
/** @internal */
201202
final public function commandStarted(CommandStartedEvent $event)
202203
{
204+
if ($event->getCommandName() !== 'aggregate') {
205+
return;
206+
}
207+
208+
$this->isFirstBatchEmpty = false;
203209
}
204210

205211
/** @internal */
@@ -211,9 +217,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
211217

212218
$reply = $event->getReply();
213219

214-
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
220+
/* Note: the spec only refers to collecting an operation time from the
221+
* "original aggregation", so only capture it if we've not already. */
222+
if (!isset($this->operationTime) && isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
215223
$this->operationTime = $reply->operationTime;
216224
}
225+
226+
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
227+
$this->isFirstBatchEmpty = empty($reply->cursor->firstBatch);
228+
}
217229
}
218230

219231
/**
@@ -227,7 +239,9 @@ final public function commandSucceeded(CommandSucceededEvent $event)
227239
*/
228240
public function execute(Server $server)
229241
{
230-
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
242+
$cursor = $this->executeAggregate($server);
243+
244+
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchEmpty);
231245
}
232246

233247
/**
@@ -255,40 +269,36 @@ private function createResumeCallable(Manager $manager)
255269
unset($this->changeStreamOptions['startAtOperationTime']);
256270
}
257271

272+
// Select a new server using the original read preference
273+
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
274+
258275
/* If we captured an operation time from the first aggregate command
259276
* and there is no "resumeAfter" option, set "startAtOperationTime"
260277
* so that we can resume from the original aggregate's time. */
261-
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
278+
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
279+
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
262280
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
263281
}
264282

283+
// Recreate the aggregate command and execute to obtain a new cursor
265284
$this->aggregate = $this->createAggregate();
285+
$cursor = $this->executeAggregate($server);
266286

267-
/* Select a new server using the read preference, execute this
268-
* operation on it, and return the new ChangeStream. */
269-
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
270-
271-
return $this->execute($server);
287+
return [$cursor, $this->isFirstBatchEmpty];
272288
};
273289
}
274290

275291
/**
276-
* Execute the aggregate command and optionally capture its operation time.
292+
* Execute the aggregate command.
293+
*
294+
* The command will be executed using APM so that we can capture its
295+
* operation time and/or firstBatch size.
277296
*
278297
* @param Server $server
279298
* @return Cursor
280299
*/
281300
private function executeAggregate(Server $server)
282301
{
283-
/* If we've already captured an operation time or the server does not
284-
* support resuming from an operation time (e.g. MongoDB 3.6), execute
285-
* the aggregation directly and return its cursor. */
286-
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
287-
return $this->aggregate->execute($server);
288-
}
289-
290-
/* Otherwise, execute the aggregation using command monitoring so that
291-
* we can capture its operation time with commandSucceeded(). */
292302
\MongoDB\Driver\Monitoring\addSubscriber($this);
293303

294304
try {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
namespace MongoDB\Tests\Model;
4+
5+
use MongoDB\Collection;
6+
use MongoDB\Driver\Exception\LogicException;
7+
use MongoDB\Model\TailableCursorIterator;
8+
use MongoDB\Operation\Find;
9+
use MongoDB\Operation\CreateCollection;
10+
use MongoDB\Operation\DropCollection;
11+
use MongoDB\Tests\CommandObserver;
12+
use MongoDB\Tests\FunctionalTestCase;
13+
14+
class TailableCursorIteratorTest extends FunctionalTestCase
15+
{
16+
private $collection;
17+
18+
public function setUp()
19+
{
20+
parent::setUp();
21+
22+
$operation = new DropCollection($this->getDatabaseName(), $this->getCollectionName());
23+
$operation->execute($this->getPrimaryServer());
24+
25+
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName(), ['capped' => true, 'size' => 8192]);
26+
$operation->execute($this->getPrimaryServer());
27+
28+
$this->collection = new Collection($this->manager, $this->getDatabaseName(), $this->getCollectionName());
29+
}
30+
31+
public function testFirstBatchIsEmpty()
32+
{
33+
$this->collection->insertOne(['x' => 1]);
34+
35+
$cursor = $this->collection->find(['x' => ['$gt' => 1]], ['cursorType' => Find::TAILABLE]);
36+
$iterator = new TailableCursorIterator($cursor, true);
37+
38+
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
39+
$this->assertFalse($iterator->valid());
40+
41+
$this->collection->insertOne(['x' => 2]);
42+
43+
$iterator->next();
44+
$this->assertTrue($iterator->valid());
45+
$this->assertMatchesDocument(['x' => 2], $iterator->current());
46+
47+
$this->expectException(LogicException::class);
48+
$iterator->rewind();
49+
}
50+
51+
public function testFirstBatchIsNotEmpty()
52+
{
53+
$this->collection->insertOne(['x' => 1]);
54+
55+
$cursor = $this->collection->find([], ['cursorType' => Find::TAILABLE]);
56+
$iterator = new TailableCursorIterator($cursor, false);
57+
58+
$this->assertNoCommandExecuted(function() use ($iterator) { $iterator->rewind(); });
59+
$this->assertTrue($iterator->valid());
60+
$this->assertMatchesDocument(['x' => 1], $iterator->current());
61+
62+
$this->collection->insertOne(['x' => 2]);
63+
64+
$iterator->next();
65+
$this->assertTrue($iterator->valid());
66+
$this->assertMatchesDocument(['x' => 2], $iterator->current());
67+
68+
$this->expectException(LogicException::class);
69+
$iterator->rewind();
70+
}
71+
72+
private function assertNoCommandExecuted(callable $callable)
73+
{
74+
$commands = [];
75+
76+
(new CommandObserver)->observe(
77+
$callable,
78+
function(array $event) use (&$commands) {
79+
$this->fail(sprintf('"%s" command was executed', $event['started']->getCommandName()));
80+
}
81+
);
82+
83+
$this->assertEmpty($commands);
84+
}
85+
}

0 commit comments

Comments
 (0)