@@ -54,6 +54,9 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
54
54
/** @var boolean */
55
55
private $ isRewindNop ;
56
56
57
+ /** @var boolean */
58
+ private $ isValid = false ;
59
+
57
60
/** @var object|null */
58
61
private $ postBatchResumeToken ;
59
62
@@ -126,6 +129,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
126
129
}
127
130
}
128
131
132
+ /**
133
+ * @see https://php.net/iteratoriterator.current
134
+ * @return mixed
135
+ */
136
+ public function current ()
137
+ {
138
+ return $ this ->isValid ? parent ::current () : null ;
139
+ }
140
+
129
141
/**
130
142
* Returns the resume token for the iterator's current position.
131
143
*
@@ -140,6 +152,15 @@ public function getResumeToken()
140
152
return $ this ->resumeToken ;
141
153
}
142
154
155
+ /**
156
+ * @see https://php.net/iteratoriterator.key
157
+ * @return mixed
158
+ */
159
+ public function key ()
160
+ {
161
+ return $ this ->isValid ? parent ::key () : null ;
162
+ }
163
+
143
164
/**
144
165
* @see https://php.net/iteratoriterator.rewind
145
166
* @return void
@@ -181,6 +202,15 @@ public function rewind()
181
202
$ this ->onIteration (false );
182
203
}
183
204
205
+ /**
206
+ * @see https://php.net/iteratoriterator.valid
207
+ * @return boolean
208
+ */
209
+ public function valid ()
210
+ {
211
+ return $ this ->isValid ;
212
+ }
213
+
184
214
/**
185
215
* Extracts the resume token (i.e. "_id" field) from a change document.
186
216
*
@@ -204,10 +234,12 @@ private function extractResumeToken($document)
204
234
: (isset ($ document ->_id ) ? $ document ->_id : null );
205
235
206
236
if (! isset ($ resumeToken )) {
237
+ $ this ->isValid = false ;
207
238
throw ResumeTokenException::notFound ();
208
239
}
209
240
210
241
if (! is_array ($ resumeToken ) && ! is_object ($ resumeToken )) {
242
+ $ this ->isValid = false ;
211
243
throw ResumeTokenException::invalidType ($ resumeToken );
212
244
}
213
245
@@ -232,16 +264,16 @@ private function isAtEndOfBatch()
232
264
*/
233
265
private function onIteration ($ incrementBatchPosition )
234
266
{
235
- $ isValid = $ this -> valid ();
267
+ $ this -> isValid = parent :: valid ();
236
268
237
269
/* Disable rewind()'s NOP behavior once we advance to a valid position.
238
270
* This will allow the driver to throw a LogicException if rewind() is
239
271
* called after the cursor has advanced past its first element. */
240
- if ($ this ->isRewindNop && $ isValid ) {
272
+ if ($ this ->isRewindNop && $ this -> isValid ) {
241
273
$ this ->isRewindNop = false ;
242
274
}
243
275
244
- if ($ incrementBatchPosition && $ isValid ) {
276
+ if ($ incrementBatchPosition && $ this -> isValid ) {
245
277
$ this ->batchPosition ++;
246
278
}
247
279
@@ -253,7 +285,7 @@ private function onIteration($incrementBatchPosition)
253
285
* from the current document if possible. */
254
286
if ($ this ->isAtEndOfBatch () && $ this ->postBatchResumeToken !== null ) {
255
287
$ this ->resumeToken = $ this ->postBatchResumeToken ;
256
- } elseif ($ isValid ) {
288
+ } elseif ($ this -> isValid ) {
257
289
$ this ->resumeToken = $ this ->extractResumeToken ($ this ->current ());
258
290
}
259
291
}
0 commit comments