Skip to content

Commit 9dec42d

Browse files
committed
PHPLIB-442: Ensure change stream resume token is updated after resume
This extracts common logic in next() and rewind() to a new method, which is now also called by resume() after it rewinds the internal iterator.
1 parent aac8e54 commit 9dec42d

File tree

2 files changed

+99
-24
lines changed

2 files changed

+99
-24
lines changed

src/ChangeStream.php

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -102,22 +102,7 @@ public function next()
102102
{
103103
try {
104104
$this->csIt->next();
105-
if ($this->valid()) {
106-
if ($this->hasAdvanced) {
107-
$this->key++;
108-
}
109-
$this->hasAdvanced = true;
110-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
111-
}
112-
/* If the cursorId is 0, the server has invalidated the cursor so we
113-
* will never perform another getMore. This means that we cannot
114-
* resume and we can therefore unset the resumeCallable, which will
115-
* free any reference to Watch. This will also free the only
116-
* reference to an implicit session, since any such reference
117-
* belongs to Watch. */
118-
if ((string) $this->getCursorId() === '0') {
119-
$this->resumeCallable = null;
120-
}
105+
$this->onIteration(true);
121106
} catch (RuntimeException $e) {
122107
if ($this->isResumableError($e)) {
123108
$this->resume();
@@ -133,14 +118,7 @@ public function rewind()
133118
{
134119
try {
135120
$this->csIt->rewind();
136-
if ($this->valid()) {
137-
$this->hasAdvanced = true;
138-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
139-
}
140-
// As with next(), free the callable once we know it will never be used.
141-
if ((string) $this->getCursorId() === '0') {
142-
$this->resumeCallable = null;
143-
}
121+
$this->onIteration(false);
144122
} catch (RuntimeException $e) {
145123
if ($this->isResumableError($e)) {
146124
$this->resume();
@@ -214,6 +192,38 @@ private function isResumableError(RuntimeException $exception)
214192
return true;
215193
}
216194

195+
/**
196+
* Perform housekeeping after an iteration event (i.e. next or rewind).
197+
*
198+
* @param boolean $isNext Whether the iteration event was a call to next()
199+
* @throws ResumeTokenException
200+
*/
201+
private function onIteration($isNext)
202+
{
203+
/* If the cursorId is 0, the server has invalidated the cursor and we
204+
* will never perform another getMore nor need to resume since any
205+
* remaining results (up to and including the invalidate event) will
206+
* have been received in the last response. Therefore, we can unset the
207+
* resumeCallable. This will free any reference to Watch as well as the
208+
* only reference to any implicit session created therein. */
209+
if ((string) $this->getCursorId() === '0') {
210+
$this->resumeCallable = null;
211+
}
212+
213+
if (!$this->valid()) {
214+
return;
215+
}
216+
217+
/* Increment the key if the iteration event was a call to next() and we
218+
* have already advanced past the first result. */
219+
if ($isNext && $this->hasAdvanced) {
220+
$this->key++;
221+
}
222+
223+
$this->hasAdvanced = true;
224+
$this->resumeToken = $this->extractResumeToken($this->iterator->current());
225+
}
226+
217227
/**
218228
* Creates a new changeStream after a resumable server error.
219229
*
@@ -224,5 +234,6 @@ private function resume()
224234
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
225235
$this->csIt = $newChangeStream->csIt;
226236
$this->csIt->rewind();
237+
$this->onIteration(false);
227238
}
228239
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,70 @@ public function testNoChangeAfterResumeBeforeInsert()
322322
$this->assertMatchesDocument($expectedResult, $changeStream->current());
323323
}
324324

325+
public function testResumeTokenIsUpdatedAfterResuming()
326+
{
327+
$this->insertDocument(['_id' => 1]);
328+
329+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
330+
$changeStream = $operation->execute($this->getPrimaryServer());
331+
332+
$changeStream->rewind();
333+
$this->assertNull($changeStream->current());
334+
335+
$this->insertDocument(['_id' => 2]);
336+
337+
$changeStream->next();
338+
$this->assertTrue($changeStream->valid());
339+
340+
$expectedResult = [
341+
'_id' => $changeStream->current()->_id,
342+
'operationType' => 'insert',
343+
'fullDocument' => ['_id' => 2],
344+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
345+
'documentKey' => ['_id' => 2],
346+
];
347+
348+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
349+
350+
$this->killChangeStreamCursor($changeStream);
351+
352+
$this->insertDocument(['_id' => 3]);
353+
354+
$changeStream->next();
355+
$this->assertTrue($changeStream->valid());
356+
357+
$expectedResult = [
358+
'_id' => $changeStream->current()->_id,
359+
'operationType' => 'insert',
360+
'fullDocument' => ['_id' => 3],
361+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
362+
'documentKey' => ['_id' => 3],
363+
];
364+
365+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
366+
367+
/* Triggering a consecutive failure will allow us to test whether the
368+
* resume token was properly updated after the last resume. If the
369+
* resume token updated, the next result will be {_id: 4}; otherwise,
370+
* we'll see {_id: 3} returned again. */
371+
$this->killChangeStreamCursor($changeStream);
372+
373+
$this->insertDocument(['_id' => 4]);
374+
375+
$changeStream->next();
376+
$this->assertTrue($changeStream->valid());
377+
378+
$expectedResult = [
379+
'_id' => $changeStream->current()->_id,
380+
'operationType' => 'insert',
381+
'fullDocument' => ['_id' => 4],
382+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
383+
'documentKey' => ['_id' => 4],
384+
];
385+
386+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
387+
}
388+
325389
public function testKey()
326390
{
327391
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

0 commit comments

Comments
 (0)