Skip to content

Commit b19328a

Browse files
committed
Merge pull request #626
2 parents 9f45714 + fdf87ad commit b19328a

File tree

2 files changed

+134
-10
lines changed

2 files changed

+134
-10
lines changed

src/ChangeStream.php

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ class ChangeStream implements Iterator
5050
private $resumeCallable;
5151
private $csIt;
5252
private $key = 0;
53+
54+
/**
55+
* Whether the change stream has advanced to its first result. This is used
56+
* to determine whether $key should be incremented after an iteration event.
57+
*/
5358
private $hasAdvanced = false;
5459

5560
/**
@@ -102,7 +107,7 @@ public function next()
102107
{
103108
try {
104109
$this->csIt->next();
105-
$this->onIteration(true);
110+
$this->onIteration($this->hasAdvanced);
106111
} catch (RuntimeException $e) {
107112
if ($this->isResumableError($e)) {
108113
$this->resume();
@@ -118,6 +123,9 @@ public function rewind()
118123
{
119124
try {
120125
$this->csIt->rewind();
126+
/* Unlike next() and resume(), the decision to increment the key
127+
* does not depend on whether the change stream has advanced. This
128+
* ensures that multiple calls to rewind() do not alter state. */
121129
$this->onIteration(false);
122130
} catch (RuntimeException $e) {
123131
if ($this->isResumableError($e)) {
@@ -193,12 +201,12 @@ private function isResumableError(RuntimeException $exception)
193201
}
194202

195203
/**
196-
* Perform housekeeping after an iteration event (i.e. next or rewind).
204+
* Perform housekeeping after an iteration event.
197205
*
198-
* @param boolean $isNext Whether the iteration event was a call to next()
206+
* @param boolean $incrementKey Increment $key if there is a current result
199207
* @throws ResumeTokenException
200208
*/
201-
private function onIteration($isNext)
209+
private function onIteration($incrementKey)
202210
{
203211
/* If the cursorId is 0, the server has invalidated the cursor and we
204212
* will never perform another getMore nor need to resume since any
@@ -210,13 +218,13 @@ private function onIteration($isNext)
210218
$this->resumeCallable = null;
211219
}
212220

221+
/* Return early if there is not a current result. Avoid any attempt to
222+
* increment the iterator's key or extract a resume token */
213223
if (!$this->valid()) {
214224
return;
215225
}
216226

217-
/* Increment the key if the iteration event was a call to next() and we
218-
* have already advanced past the first result. */
219-
if ($isNext && $this->hasAdvanced) {
227+
if ($incrementKey) {
220228
$this->key++;
221229
}
222230

@@ -234,6 +242,14 @@ private function resume()
234242
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
235243
$this->csIt = $newChangeStream->csIt;
236244
$this->csIt->rewind();
237-
$this->onIteration(false);
245+
/* Note: if we are resuming after a call to ChangeStream::rewind(),
246+
* $hasAdvanced will always be false. For it to be true, rewind() would
247+
* need to have thrown a RuntimeException with a resumable error, which
248+
* can only happen during the first call to IteratorIterator::rewind()
249+
* before onIteration() has a chance to set $hasAdvanced to true.
250+
* Otherwise, IteratorIterator::rewind() would either NOP (consecutive
251+
* rewinds) or throw a LogicException (rewind after next), neither of
252+
* which would result in a call to resume(). */
253+
$this->onIteration($this->hasAdvanced);
238254
}
239255
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use MongoDB\Driver\ReadPreference;
99
use MongoDB\Driver\Server;
1010
use MongoDB\Driver\Exception\ConnectionTimeoutException;
11+
use MongoDB\Driver\Exception\LogicException;
1112
use MongoDB\Exception\ResumeTokenException;
1213
use MongoDB\Operation\CreateCollection;
1314
use MongoDB\Operation\DatabaseCommand;
@@ -225,6 +226,62 @@ private function assertStartAtOperationTime(TimestampInterface $expectedOperatio
225226
$this->assertEquals($expectedOperationTime, $command->pipeline[0]->{'$changeStream'}->startAtOperationTime);
226227
}
227228

229+
public function testRewindMultipleTimesWithResults()
230+
{
231+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
232+
$changeStream = $operation->execute($this->getPrimaryServer());
233+
234+
$this->insertDocument(['x' => 1]);
235+
$this->insertDocument(['x' => 2]);
236+
237+
$changeStream->rewind();
238+
$this->assertTrue($changeStream->valid());
239+
$this->assertSame(0, $changeStream->key());
240+
$this->assertNotNull($changeStream->current());
241+
242+
// Subsequent rewind does not change iterator state
243+
$changeStream->rewind();
244+
$this->assertTrue($changeStream->valid());
245+
$this->assertSame(0, $changeStream->key());
246+
$this->assertNotNull($changeStream->current());
247+
248+
$changeStream->next();
249+
250+
$this->assertTrue($changeStream->valid());
251+
$this->assertSame(1, $changeStream->key());
252+
$this->assertNotNull($changeStream->current());
253+
254+
// Rewinding after advancing the iterator is an error
255+
$this->expectException(LogicException::class);
256+
$changeStream->rewind();
257+
}
258+
259+
public function testRewindMultipleTimesWithNoResults()
260+
{
261+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
262+
$changeStream = $operation->execute($this->getPrimaryServer());
263+
264+
$changeStream->rewind();
265+
$this->assertFalse($changeStream->valid());
266+
$this->assertNull($changeStream->key());
267+
$this->assertNull($changeStream->current());
268+
269+
// Subsequent rewind does not change iterator state
270+
$changeStream->rewind();
271+
$this->assertFalse($changeStream->valid());
272+
$this->assertNull($changeStream->key());
273+
$this->assertNull($changeStream->current());
274+
275+
$changeStream->next();
276+
$this->assertFalse($changeStream->valid());
277+
$this->assertNull($changeStream->key());
278+
$this->assertNull($changeStream->current());
279+
280+
// Rewinding after advancing the iterator is an error
281+
$this->expectException(LogicException::class);
282+
$changeStream->rewind();
283+
}
284+
228285
public function testRewindResumesAfterConnectionException()
229286
{
230287
/* In order to trigger a dropped connection, we'll use a new client with
@@ -322,20 +379,67 @@ public function testNoChangeAfterResumeBeforeInsert()
322379
$this->assertMatchesDocument($expectedResult, $changeStream->current());
323380
}
324381

325-
public function testResumeTokenIsUpdatedAfterResuming()
382+
public function testResumeMultipleTimesInSuccession()
326383
{
327-
$this->insertDocument(['_id' => 1]);
384+
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
385+
$operation->execute($this->getPrimaryServer());
328386

329387
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
330388
$changeStream = $operation->execute($this->getPrimaryServer());
331389

390+
/* Killing the cursor when there are no results will test that neither
391+
* the initial rewind() nor its resume attempt incremented the key. */
392+
$this->killChangeStreamCursor($changeStream);
393+
332394
$changeStream->rewind();
395+
$this->assertFalse($changeStream->valid());
396+
$this->assertNull($changeStream->key());
333397
$this->assertNull($changeStream->current());
334398

399+
$this->insertDocument(['_id' => 1]);
400+
401+
/* Killing the cursor a second time when there is a result will test
402+
* that the resume attempt picks up the latest change. */
403+
$this->killChangeStreamCursor($changeStream);
404+
405+
$changeStream->rewind();
406+
$this->assertTrue($changeStream->valid());
407+
$this->assertSame(0, $changeStream->key());
408+
409+
$expectedResult = [
410+
'_id' => $changeStream->current()->_id,
411+
'operationType' => 'insert',
412+
'fullDocument' => ['_id' => 1],
413+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
414+
'documentKey' => ['_id' => 1],
415+
];
416+
417+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
418+
419+
/* Killing the cursor a second time will not trigger a resume until
420+
* ChangeStream::next() is called. A successive call to rewind() should
421+
* not change the iterator's state and preserve the current result. */
422+
$this->killChangeStreamCursor($changeStream);
423+
424+
$changeStream->rewind();
425+
$this->assertTrue($changeStream->valid());
426+
$this->assertSame(0, $changeStream->key());
427+
428+
$expectedResult = [
429+
'_id' => $changeStream->current()->_id,
430+
'operationType' => 'insert',
431+
'fullDocument' => ['_id' => 1],
432+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
433+
'documentKey' => ['_id' => 1],
434+
];
435+
436+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
437+
335438
$this->insertDocument(['_id' => 2]);
336439

337440
$changeStream->next();
338441
$this->assertTrue($changeStream->valid());
442+
$this->assertSame(1, $changeStream->key());
339443

340444
$expectedResult = [
341445
'_id' => $changeStream->current()->_id,
@@ -353,6 +457,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
353457

354458
$changeStream->next();
355459
$this->assertTrue($changeStream->valid());
460+
$this->assertSame(2, $changeStream->key());
356461

357462
$expectedResult = [
358463
'_id' => $changeStream->current()->_id,
@@ -374,6 +479,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
374479

375480
$changeStream->next();
376481
$this->assertTrue($changeStream->valid());
482+
$this->assertSame(3, $changeStream->key());
377483

378484
$expectedResult = [
379485
'_id' => $changeStream->current()->_id,
@@ -689,6 +795,8 @@ public function testNextAdvancesKey()
689795
$this->insertDocument(['x' => 1]);
690796
$this->insertDocument(['x' => 2]);
691797

798+
/* Note: we intentionally do not start iteration with rewind() to ensure
799+
* that next() behaves identically when called without rewind(). */
692800
$changeStream->next();
693801

694802
$this->assertSame(0, $changeStream->key());

0 commit comments

Comments
 (0)