@@ -48,6 +48,7 @@ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
48
48
private $ batchPosition = 0 ;
49
49
private $ batchSize ;
50
50
private $ isRewindNop ;
51
+ private $ isValid = false ;
51
52
private $ postBatchResumeToken ;
52
53
private $ resumeToken ;
53
54
@@ -117,6 +118,15 @@ final public function commandSucceeded(CommandSucceededEvent $event)
117
118
}
118
119
}
119
120
121
+ /**
122
+ * @see https://php.net/iteratoriterator.current
123
+ * @return mixed
124
+ */
125
+ public function current ()
126
+ {
127
+ return $ this ->isValid ? parent ::current () : null ;
128
+ }
129
+
120
130
/**
121
131
* Returns the resume token for the iterator's current position.
122
132
*
@@ -131,6 +141,15 @@ public function getResumeToken()
131
141
return $ this ->resumeToken ;
132
142
}
133
143
144
+ /**
145
+ * @see https://php.net/iteratoriterator.key
146
+ * @return mixed
147
+ */
148
+ public function key ()
149
+ {
150
+ return $ this ->isValid ? parent ::key () : null ;
151
+ }
152
+
134
153
/**
135
154
* @see https://php.net/iteratoriterator.rewind
136
155
* @return void
@@ -172,6 +191,15 @@ public function rewind()
172
191
$ this ->onIteration (false );
173
192
}
174
193
194
+ /**
195
+ * @see https://php.net/iteratoriterator.valid
196
+ * @return boolean
197
+ */
198
+ public function valid ()
199
+ {
200
+ return $ this ->isValid ;
201
+ }
202
+
175
203
/**
176
204
* Extracts the resume token (i.e. "_id" field) from a change document.
177
205
*
@@ -195,10 +223,12 @@ private function extractResumeToken($document)
195
223
: (isset ($ document ->_id ) ? $ document ->_id : null );
196
224
197
225
if (! isset ($ resumeToken )) {
226
+ $ this ->isValid = false ;
198
227
throw ResumeTokenException::notFound ();
199
228
}
200
229
201
230
if (! is_array ($ resumeToken ) && ! is_object ($ resumeToken )) {
231
+ $ this ->isValid = false ;
202
232
throw ResumeTokenException::invalidType ($ resumeToken );
203
233
}
204
234
@@ -223,16 +253,16 @@ private function isAtEndOfBatch()
223
253
*/
224
254
private function onIteration ($ incrementBatchPosition )
225
255
{
226
- $ isValid = $ this -> valid ();
256
+ $ this -> isValid = parent :: valid ();
227
257
228
258
/* Disable rewind()'s NOP behavior once we advance to a valid position.
229
259
* This will allow the driver to throw a LogicException if rewind() is
230
260
* called after the cursor has advanced past its first element. */
231
- if ($ this ->isRewindNop && $ isValid ) {
261
+ if ($ this ->isRewindNop && $ this -> isValid ) {
232
262
$ this ->isRewindNop = false ;
233
263
}
234
264
235
- if ($ incrementBatchPosition && $ isValid ) {
265
+ if ($ incrementBatchPosition && $ this -> isValid ) {
236
266
$ this ->batchPosition ++;
237
267
}
238
268
@@ -244,7 +274,7 @@ private function onIteration($incrementBatchPosition)
244
274
* from the current document if possible. */
245
275
if ($ this ->isAtEndOfBatch () && $ this ->postBatchResumeToken !== null ) {
246
276
$ this ->resumeToken = $ this ->postBatchResumeToken ;
247
- } elseif ($ isValid ) {
277
+ } elseif ($ this -> isValid ) {
248
278
$ this ->resumeToken = $ this ->extractResumeToken ($ this ->current ());
249
279
}
250
280
}
0 commit comments