Skip to content

Commit dea9c32

Browse files
committed
PHPLIB-441: Increment ChangeStream key consistently when resuming
1 parent 909121f commit dea9c32

File tree

2 files changed

+74
-10
lines changed

2 files changed

+74
-10
lines changed

src/ChangeStream.php

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ class ChangeStream implements Iterator
5050
private $resumeCallable;
5151
private $csIt;
5252
private $key = 0;
53+
54+
/**
55+
* Whether the change stream has advanced to its first result. This is used
56+
* to determine whether $key should be incremented after an iteration event.
57+
*/
5358
private $hasAdvanced = false;
5459

5560
/**
@@ -102,7 +107,7 @@ public function next()
102107
{
103108
try {
104109
$this->csIt->next();
105-
$this->onIteration(true);
110+
$this->onIteration($this->hasAdvanced);
106111
} catch (RuntimeException $e) {
107112
if ($this->isResumableError($e)) {
108113
$this->resume();
@@ -118,6 +123,9 @@ public function rewind()
118123
{
119124
try {
120125
$this->csIt->rewind();
126+
/* Unlike next() and resume(), the decision to increment the key
127+
* does not depend on whether the change stream has advanced. This
128+
* ensures that multiple calls to rewind() do not alter state. */
121129
$this->onIteration(false);
122130
} catch (RuntimeException $e) {
123131
if ($this->isResumableError($e)) {
@@ -193,12 +201,12 @@ private function isResumableError(RuntimeException $exception)
193201
}
194202

195203
/**
196-
* Perform housekeeping after an iteration event (i.e. next or rewind).
204+
* Perform housekeeping after an iteration event.
197205
*
198-
* @param boolean $isNext Whether the iteration event was a call to next()
206+
* @param boolean $incrementKey Increment $key if there is a current result
199207
* @throws ResumeTokenException
200208
*/
201-
private function onIteration($isNext)
209+
private function onIteration($incrementKey)
202210
{
203211
/* If the cursorId is 0, the server has invalidated the cursor and we
204212
* will never perform another getMore nor need to resume since any
@@ -210,13 +218,13 @@ private function onIteration($isNext)
210218
$this->resumeCallable = null;
211219
}
212220

221+
/* Return early if there is not a current result. Avoid any attempt to
222+
* increment the iterator's key or extract a resume token */
213223
if (!$this->valid()) {
214224
return;
215225
}
216226

217-
/* Increment the key if the iteration event was a call to next() and we
218-
* have already advanced past the first result. */
219-
if ($isNext && $this->hasAdvanced) {
227+
if ($incrementKey) {
220228
$this->key++;
221229
}
222230

@@ -234,6 +242,13 @@ private function resume()
234242
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
235243
$this->csIt = $newChangeStream->csIt;
236244
$this->csIt->rewind();
237-
$this->onIteration(false);
245+
/* Note: if we are resuming after a call to rewind(), $hasAdvanced will
246+
* always be false. If it were true, then the previous call to
247+
* IteratorIterator::rewind() would either have been a NOP and succeeded
248+
* because the cursor was still at its first position or have thrown a
249+
* LogicException for attempting to rewind an advanced cursor. In the
250+
* latter case, the exception would have bypassed the catch block for
251+
* resuming. */
252+
$this->onIteration($this->hasAdvanced);
238253
}
239254
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,20 +379,67 @@ public function testNoChangeAfterResumeBeforeInsert()
379379
$this->assertMatchesDocument($expectedResult, $changeStream->current());
380380
}
381381

382-
public function testResumeTokenIsUpdatedAfterResuming()
382+
public function testResumeMultipleTimesInSuccession()
383383
{
384-
$this->insertDocument(['_id' => 1]);
384+
$operation = new CreateCollection($this->getDatabaseName(), $this->getCollectionName());
385+
$operation->execute($this->getPrimaryServer());
385386

386387
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
387388
$changeStream = $operation->execute($this->getPrimaryServer());
388389

390+
/* Killing the cursor when there are no results will tests that neither
391+
* the initial rewind() nor its resume attempt incremented the key. */
392+
$this->killChangeStreamCursor($changeStream);
393+
389394
$changeStream->rewind();
395+
$this->assertFalse($changeStream->valid());
396+
$this->assertNull($changeStream->key());
390397
$this->assertNull($changeStream->current());
391398

399+
$this->insertDocument(['_id' => 1]);
400+
401+
/* Killing the cursor a second time when there is a result will test
402+
* that the resume attempt picks up the latest change. */
403+
$this->killChangeStreamCursor($changeStream);
404+
405+
$changeStream->rewind();
406+
$this->assertTrue($changeStream->valid());
407+
$this->assertSame(0, $changeStream->key());
408+
409+
$expectedResult = [
410+
'_id' => $changeStream->current()->_id,
411+
'operationType' => 'insert',
412+
'fullDocument' => ['_id' => 1],
413+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
414+
'documentKey' => ['_id' => 1],
415+
];
416+
417+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
418+
419+
/* Killing the cursor a second time will not trigger a resume until
420+
* ChangeStream::next() is called. A successive call to rewind() should
421+
* not change the iterator's state and preserve the current result. */
422+
$this->killChangeStreamCursor($changeStream);
423+
424+
$changeStream->rewind();
425+
$this->assertTrue($changeStream->valid());
426+
$this->assertSame(0, $changeStream->key());
427+
428+
$expectedResult = [
429+
'_id' => $changeStream->current()->_id,
430+
'operationType' => 'insert',
431+
'fullDocument' => ['_id' => 1],
432+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
433+
'documentKey' => ['_id' => 1],
434+
];
435+
436+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
437+
392438
$this->insertDocument(['_id' => 2]);
393439

394440
$changeStream->next();
395441
$this->assertTrue($changeStream->valid());
442+
$this->assertSame(1, $changeStream->key());
396443

397444
$expectedResult = [
398445
'_id' => $changeStream->current()->_id,
@@ -410,6 +457,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
410457

411458
$changeStream->next();
412459
$this->assertTrue($changeStream->valid());
460+
$this->assertSame(2, $changeStream->key());
413461

414462
$expectedResult = [
415463
'_id' => $changeStream->current()->_id,
@@ -431,6 +479,7 @@ public function testResumeTokenIsUpdatedAfterResuming()
431479

432480
$changeStream->next();
433481
$this->assertTrue($changeStream->valid());
482+
$this->assertSame(3, $changeStream->key());
434483

435484
$expectedResult = [
436485
'_id' => $changeStream->current()->_id,

0 commit comments

Comments
 (0)