Skip to content

Commit d63c3fa

Browse files
committed
Merge branch 'v1.4'
2 parents 770e2af + a1852b2 commit d63c3fa

File tree

2 files changed

+100
-25
lines changed

2 files changed

+100
-25
lines changed

src/ChangeStream.php

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,7 @@ public function next()
103103
{
104104
try {
105105
$this->csIt->next();
106-
if ($this->valid()) {
107-
if ($this->hasAdvanced) {
108-
$this->key++;
109-
}
110-
$this->hasAdvanced = true;
111-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
112-
}
113-
/* If the cursorId is 0, the server has invalidated the cursor so we
114-
* will never perform another getMore. This means that we cannot
115-
* resume and we can therefore unset the resumeCallable, which will
116-
* free any reference to Watch. This will also free the only
117-
* reference to an implicit session, since any such reference
118-
* belongs to Watch. */
119-
if ((string) $this->getCursorId() === '0') {
120-
$this->resumeCallable = null;
121-
}
106+
$this->onIteration(true);
122107
} catch (RuntimeException $e) {
123108
$this->resumeOrThrow($e);
124109
}
@@ -133,14 +118,7 @@ public function rewind()
133118
{
134119
try {
135120
$this->csIt->rewind();
136-
if ($this->valid()) {
137-
$this->hasAdvanced = true;
138-
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
139-
}
140-
// As with next(), free the callable once we know it will never be used.
141-
if ((string) $this->getCursorId() === '0') {
142-
$this->resumeCallable = null;
143-
}
121+
$this->onIteration(false);
144122
} catch (RuntimeException $e) {
145123
$this->resumeOrThrow($e);
146124
}
@@ -158,7 +136,7 @@ public function valid()
158136
/**
159137
* Extracts the resume token (i.e. "_id" field) from the change document.
160138
*
161-
* @param array|document $document Change document
139+
* @param array|object $document Change document
162140
* @return mixed
163141
* @throws InvalidArgumentException
164142
* @throws ResumeTokenException if the resume token is not found or invalid
@@ -212,6 +190,38 @@ private function isResumableError(RuntimeException $exception)
212190
return true;
213191
}
214192

193+
/**
194+
* Perform housekeeping after an iteration event (i.e. next or rewind).
195+
*
196+
* @param boolean $isNext Whether the iteration event was a call to next()
197+
* @throws ResumeTokenException
198+
*/
199+
private function onIteration($isNext)
200+
{
201+
/* If the cursorId is 0, the server has invalidated the cursor and we
202+
* will never perform another getMore nor need to resume since any
203+
* remaining results (up to and including the invalidate event) will
204+
* have been received in the last response. Therefore, we can unset the
205+
* resumeCallable. This will free any reference to Watch as well as the
206+
* only reference to any implicit session created therein. */
207+
if ((string) $this->getCursorId() === '0') {
208+
$this->resumeCallable = null;
209+
}
210+
211+
if (!$this->valid()) {
212+
return;
213+
}
214+
215+
/* Increment the key if the iteration event was a call to next() and we
216+
* have already advanced past the first result. */
217+
if ($isNext && $this->hasAdvanced) {
218+
$this->key++;
219+
}
220+
221+
$this->hasAdvanced = true;
222+
$this->resumeToken = $this->extractResumeToken($this->csIt->current());
223+
}
224+
215225
/**
216226
* Creates a new changeStream after a resumable server error.
217227
*
@@ -222,6 +232,7 @@ private function resume()
222232
$newChangeStream = call_user_func($this->resumeCallable, $this->resumeToken);
223233
$this->csIt = $newChangeStream->csIt;
224234
$this->csIt->rewind();
235+
$this->onIteration(false);
225236
}
226237

227238
/**

tests/Operation/WatchFunctionalTest.php

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,70 @@ public function testNoChangeAfterResumeBeforeInsert()
314314
$this->assertMatchesDocument($expectedResult, $changeStream->current());
315315
}
316316

317+
public function testResumeTokenIsUpdatedAfterResuming()
318+
{
319+
$this->insertDocument(['_id' => 1]);
320+
321+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
322+
$changeStream = $operation->execute($this->getPrimaryServer());
323+
324+
$changeStream->rewind();
325+
$this->assertNull($changeStream->current());
326+
327+
$this->insertDocument(['_id' => 2]);
328+
329+
$changeStream->next();
330+
$this->assertTrue($changeStream->valid());
331+
332+
$expectedResult = [
333+
'_id' => $changeStream->current()->_id,
334+
'operationType' => 'insert',
335+
'fullDocument' => ['_id' => 2],
336+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
337+
'documentKey' => ['_id' => 2],
338+
];
339+
340+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
341+
342+
$this->killChangeStreamCursor($changeStream);
343+
344+
$this->insertDocument(['_id' => 3]);
345+
346+
$changeStream->next();
347+
$this->assertTrue($changeStream->valid());
348+
349+
$expectedResult = [
350+
'_id' => $changeStream->current()->_id,
351+
'operationType' => 'insert',
352+
'fullDocument' => ['_id' => 3],
353+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
354+
'documentKey' => ['_id' => 3],
355+
];
356+
357+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
358+
359+
/* Triggering a consecutive failure will allow us to test whether the
360+
* resume token was properly updated after the last resume. If the
361+
* resume token updated, the next result will be {_id: 4}; otherwise,
362+
* we'll see {_id: 3} returned again. */
363+
$this->killChangeStreamCursor($changeStream);
364+
365+
$this->insertDocument(['_id' => 4]);
366+
367+
$changeStream->next();
368+
$this->assertTrue($changeStream->valid());
369+
370+
$expectedResult = [
371+
'_id' => $changeStream->current()->_id,
372+
'operationType' => 'insert',
373+
'fullDocument' => ['_id' => 4],
374+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
375+
'documentKey' => ['_id' => 4],
376+
];
377+
378+
$this->assertMatchesDocument($expectedResult, $changeStream->current());
379+
}
380+
317381
public function testKey()
318382
{
319383
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);

0 commit comments

Comments
 (0)