Skip to content

Commit 7f20ffb

Browse files
committed
PHPLIB-456: Always invalidate ChangeStream position if iteration fails
1 parent a800cde commit 7f20ffb

File tree

2 files changed

+42
-21
lines changed

2 files changed

+42
-21
lines changed

src/Model/ChangeStreamIterator.php

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
5454
/** @var boolean */
5555
private $isRewindNop;
5656

57+
/** @var boolean */
58+
private $isValid = false;
59+
5760
/** @var object|null */
5861
private $postBatchResumeToken;
5962

@@ -126,6 +129,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
126129
}
127130
}
128131

132+
/**
133+
* @see https://php.net/iteratoriterator.current
134+
* @return mixed
135+
*/
136+
public function current()
137+
{
138+
return $this->isValid ? parent::current() : null;
139+
}
140+
129141
/**
130142
* Returns the resume token for the iterator's current position.
131143
*
@@ -140,6 +152,15 @@ public function getResumeToken()
140152
return $this->resumeToken;
141153
}
142154

155+
/**
156+
* @see https://php.net/iteratoriterator.key
157+
* @return mixed
158+
*/
159+
public function key()
160+
{
161+
return $this->isValid ? parent::key() : null;
162+
}
163+
143164
/**
144165
* @see https://php.net/iteratoriterator.rewind
145166
* @return void
@@ -181,6 +202,15 @@ public function rewind()
181202
$this->onIteration(false);
182203
}
183204

205+
/**
206+
* @see https://php.net/iteratoriterator.valid
207+
* @return boolean
208+
*/
209+
public function valid()
210+
{
211+
return $this->isValid;
212+
}
213+
184214
/**
185215
* Extracts the resume token (i.e. "_id" field) from a change document.
186216
*
@@ -204,10 +234,12 @@ private function extractResumeToken($document)
204234
: (isset($document->_id) ? $document->_id : null);
205235

206236
if (! isset($resumeToken)) {
237+
$this->isValid = false;
207238
throw ResumeTokenException::notFound();
208239
}
209240

210241
if (! is_array($resumeToken) && ! is_object($resumeToken)) {
242+
$this->isValid = false;
211243
throw ResumeTokenException::invalidType($resumeToken);
212244
}
213245

@@ -232,16 +264,16 @@ private function isAtEndOfBatch()
232264
*/
233265
private function onIteration($incrementBatchPosition)
234266
{
235-
$isValid = $this->valid();
267+
$this->isValid = parent::valid();
236268

237269
/* Disable rewind()'s NOP behavior once we advance to a valid position.
238270
* This will allow the driver to throw a LogicException if rewind() is
239271
* called after the cursor has advanced past its first element. */
240-
if ($this->isRewindNop && $isValid) {
272+
if ($this->isRewindNop && $this->isValid) {
241273
$this->isRewindNop = false;
242274
}
243275

244-
if ($incrementBatchPosition && $isValid) {
276+
if ($incrementBatchPosition && $this->isValid) {
245277
$this->batchPosition++;
246278
}
247279

@@ -253,7 +285,7 @@ private function onIteration($incrementBatchPosition)
253285
* from the current document if possible. */
254286
if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
255287
$this->resumeToken = $this->postBatchResumeToken;
256-
} elseif ($isValid) {
288+
} elseif ($this->isValid) {
257289
$this->resumeToken = $this->extractResumeToken($this->current());
258290
}
259291
}

tests/Operation/WatchFunctionalTest.php

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,34 +1125,23 @@ public function testResumeTokenNotFoundDoesNotAdvanceKey()
11251125
$changeStream->next();
11261126
$this->fail('Exception for missing resume token was not thrown');
11271127
} catch (ResumeTokenException $e) {
1128-
/* If a client-side error is thrown (server < 4.1.8), the tailable
1129-
* cursor's position is still valid. This may change once PHPLIB-456
1130-
* is implemented. */
1131-
$expectedValid = true;
1132-
$expectedKey = 0;
1128+
/* On server versions < 4.1.8, a client-side error is thrown. */
11331129
} catch (ServerException $e) {
1134-
/* If a server-side error is thrown (server >= 4.1.8), the tailable
1135-
* cursor's position is not valid. */
1136-
$expectedValid = false;
1137-
$expectedKey = null;
1130+
/* On server versions >= 4.1.8, the error is thrown server-side. */
11381131
}
11391132

1140-
$this->assertSame($expectedValid, $changeStream->valid());
1141-
$this->assertSame($expectedKey, $changeStream->key());
1133+
$this->assertFalse($changeStream->valid());
1134+
$this->assertNull($changeStream->key());
11421135

11431136
try {
11441137
$changeStream->next();
11451138
$this->fail('Exception for missing resume token was not thrown');
11461139
} catch (ResumeTokenException $e) {
1147-
$expectedValid = true;
1148-
$expectedKey = 0;
11491140
} catch (ServerException $e) {
1150-
$expectedValid = false;
1151-
$expectedKey = null;
11521141
}
11531142

1154-
$this->assertSame($expectedValid, $changeStream->valid());
1155-
$this->assertSame($expectedKey, $changeStream->key());
1143+
$this->assertFalse($changeStream->valid());
1144+
$this->assertNull($changeStream->key());
11561145
}
11571146

11581147
public function testSessionPersistsAfterResume()

0 commit comments

Comments
 (0)