Skip to content

Commit 989056d

Browse files
committed
PHPLIB-451: ChangeStream::rewind() should never execute getMore
Tests were revised to no longer expect rewind() to encounter an error and resume. In most tests, rewind() is a NOP because the change stream has no events to return upon creation. Since Watch now always executes aggregate commands with APM, the check for startAtOperationTime support was moved to createResumeCallable(). Various assertions for current() returning null were also changed to instead check valid(). Both forms are equivalent, but checking valid() is more consistent with our iteration examples.
1 parent b6c7f11 commit 989056d

File tree

5 files changed

+181
-267
lines changed

5 files changed

+181
-267
lines changed

src/ChangeStream.php

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
use MongoDB\Driver\Exception\ServerException;
2525
use MongoDB\Exception\InvalidArgumentException;
2626
use MongoDB\Exception\ResumeTokenException;
27-
use IteratorIterator;
27+
use MongoDB\Model\TailableCursorIterator;
2828
use Iterator;
2929

3030
/**
@@ -61,13 +61,14 @@ class ChangeStream implements Iterator
6161
* Constructor.
6262
*
6363
* @internal
64-
* @param Cursor $cursor
64+
* @param Cursor $cursor
6565
* @param callable $resumeCallable
66+
* @param boolean $isFirstBatchIsEmpty
6667
*/
67-
public function __construct(Cursor $cursor, callable $resumeCallable)
68+
public function __construct(Cursor $cursor, callable $resumeCallable, $isFirstBatchIsEmpty)
6869
{
6970
$this->resumeCallable = $resumeCallable;
70-
$this->csIt = new IteratorIterator($cursor);
71+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchIsEmpty);
7172
}
7273

7374
/**
@@ -242,17 +243,11 @@ private function onIteration($incrementKey)
242243
*/
243244
private function resume()
244245
{
245-
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
246-
$this->csIt = $newChangeStream->csIt;
246+
list($cursor, $isFirstBatchIsEmpty) = call_user_func($this->resumeCallable, $this->resumeToken);
247+
248+
$this->csIt = new TailableCursorIterator($cursor, $isFirstBatchIsEmpty);
247249
$this->csIt->rewind();
248-
/* Note: if we are resuming after a call to ChangeStream::rewind(),
249-
* $hasAdvanced will always be false. For it to be true, rewind() would
250-
* need to have thrown a RuntimeException with a resumable error, which
251-
* can only happen during the first call to IteratorIterator::rewind()
252-
* before onIteration() has a chance to set $hasAdvanced to true.
253-
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
254-
* rewinds) or throw a LogicException (rewind after next), neither of
255-
* which would result in a call to resume(). */
250+
256251
$this->onIteration($this->hasAdvanced);
257252
}
258253

src/Operation/Watch.php

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
5757
private $changeStreamOptions;
5858
private $collectionName;
5959
private $databaseName;
60+
private $isFirstBatchIsEmpty = false;
6061
private $operationTime;
6162
private $pipeline;
6263
private $resumeCallable;
@@ -200,6 +201,12 @@ final public function commandFailed(CommandFailedEvent $event)
200201
/** @internal */
201202
final public function commandStarted(CommandStartedEvent $event)
202203
{
204+
if ($event->getCommandName() !== 'aggregate') {
205+
return;
206+
}
207+
208+
$this->operationTime = null;
209+
$this->isFirstBatchIsEmpty = false;
203210
}
204211

205212
/** @internal */
@@ -214,6 +221,10 @@ final public function commandSucceeded(CommandSucceededEvent $event)
214221
if (isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
215222
$this->operationTime = $reply->operationTime;
216223
}
224+
225+
if (isset($reply->cursor->firstBatch) && is_array($reply->cursor->firstBatch)) {
226+
$this->isFirstBatchIsEmpty = empty($reply->cursor->firstBatch);
227+
}
217228
}
218229

219230
/**
@@ -227,7 +238,9 @@ final public function commandSucceeded(CommandSucceededEvent $event)
227238
*/
228239
public function execute(Server $server)
229240
{
230-
return new ChangeStream($this->executeAggregate($server), $this->resumeCallable);
241+
$cursor = $this->executeAggregate($server);
242+
243+
return new ChangeStream($cursor, $this->resumeCallable, $this->isFirstBatchIsEmpty);
231244
}
232245

233246
/**
@@ -255,40 +268,36 @@ private function createResumeCallable(Manager $manager)
255268
unset($this->changeStreamOptions['startAtOperationTime']);
256269
}
257270

271+
// Select a new server using the original read preference
272+
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
273+
258274
/* If we captured an operation time from the first aggregate command
259275
* and there is no "resumeAfter" option, set "startAtOperationTime"
260276
* so that we can resume from the original aggregate's time. */
261-
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter'])) {
277+
if ($this->operationTime !== null && ! isset($this->changeStreamOptions['resumeAfter']) &&
278+
\MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
262279
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
263280
}
264281

282+
// Recreate the aggregate command and execute to obtain a new cursor
265283
$this->aggregate = $this->createAggregate();
284+
$cursor = $this->executeAggregate($server);
266285

267-
/* Select a new server using the read preference, execute this
268-
* operation on it, and return the new ChangeStream. */
269-
$server = $manager->selectServer($this->aggregateOptions['readPreference']);
270-
271-
return $this->execute($server);
286+
return [$cursor, $this->isFirstBatchIsEmpty];
272287
};
273288
}
274289

275290
/**
276-
* Execute the aggregate command and optionally capture its operation time.
291+
* Execute the aggregate command.
292+
*
293+
* The command will be executed using APM so that we can capture its
294+
* operation time and/or firstBatch size.
277295
*
278296
* @param Server $server
279297
* @return Cursor
280298
*/
281299
private function executeAggregate(Server $server)
282300
{
283-
/* If we've already captured an operation time or the server does not
284-
* support resuming from an operation time (e.g. MongoDB 3.6), execute
285-
* the aggregation directly and return its cursor. */
286-
if ($this->operationTime !== null || ! \MongoDB\server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
287-
return $this->aggregate->execute($server);
288-
}
289-
290-
/* Otherwise, execute the aggregation using command monitoring so that
291-
* we can capture its operation time with commandSucceeded(). */
292301
\MongoDB\Driver\Monitoring\addSubscriber($this);
293302

294303
try {

0 commit comments

Comments
 (0)