19
19
20
20
use MongoDB \BSON \Serializable ;
21
21
use MongoDB \Driver \Cursor ;
22
- use MongoDB \Driver \Exception \ConnectionTimeoutException ;
22
+ use MongoDB \Driver \Exception \ConnectionException ;
23
23
use MongoDB \Driver \Exception \RuntimeException ;
24
+ use MongoDB \Driver \Exception \ServerException ;
24
25
use MongoDB \Exception \InvalidArgumentException ;
25
26
use MongoDB \Exception \ResumeTokenException ;
26
27
use IteratorIterator ;
35
36
*/
36
37
class ChangeStream implements Iterator
37
38
{
39
+ /**
40
+ * @deprecated 1.4
41
+ * @todo Remove this in 2.0 (see: PHPLIB-360)
42
+ */
43
+ const CURSOR_NOT_FOUND = 43 ;
44
+
45
+ private static $ errorCodeCappedPositionLost = 136 ;
46
+ private static $ errorCodeInterrupted = 11601 ;
47
+ private static $ errorCodeCursorKilled = 237 ;
48
+
38
49
private $ resumeToken ;
39
50
private $ resumeCallable ;
40
51
private $ csIt ;
41
52
private $ key = 0 ;
42
53
private $ hasAdvanced = false ;
43
54
44
- const CURSOR_NOT_FOUND = 43 ;
45
-
46
55
/**
47
56
* Constructor.
48
57
*
@@ -91,7 +100,6 @@ public function key()
91
100
*/
92
101
public function next ()
93
102
{
94
- $ resumable = false ;
95
103
try {
96
104
$ this ->csIt ->next ();
97
105
if ($ this ->valid ()) {
@@ -111,18 +119,9 @@ public function next()
111
119
$ this ->resumeCallable = null ;
112
120
}
113
121
} catch (RuntimeException $ e ) {
114
- if (strpos ($ e ->getMessage (), "not master " ) !== false ) {
115
- $ resumable = true ;
116
- }
117
- if ($ e ->getCode () === self ::CURSOR_NOT_FOUND ) {
118
- $ resumable = true ;
122
+ if ($ this ->isResumableError ($ e )) {
123
+ $ this ->resume ();
119
124
}
120
- if ($ e instanceof ConnectionTimeoutException) {
121
- $ resumable = true ;
122
- }
123
- }
124
- if ($ resumable ) {
125
- $ this ->resume ();
126
125
}
127
126
}
128
127
@@ -144,18 +143,9 @@ public function rewind()
144
143
$ this ->resumeCallable = null ;
145
144
}
146
145
} catch (RuntimeException $ e ) {
147
- if (strpos ( $ e -> getMessage (), " not master " ) !== false ) {
148
- $ resumable = true ;
146
+ if ($ this -> isResumableError ( $ e ) ) {
147
+ $ this -> resume () ;
149
148
}
150
- if ($ e ->getCode () === self ::CURSOR_NOT_FOUND ) {
151
- $ resumable = true ;
152
- }
153
- if ($ e instanceof ConnectionTimeoutException) {
154
- $ resumable = true ;
155
- }
156
- }
157
- if ($ resumable ) {
158
- $ this ->resume ();
159
149
}
160
150
}
161
151
@@ -201,6 +191,30 @@ private function extractResumeToken($document)
201
191
return $ resumeToken ;
202
192
}
203
193
194
+ /**
195
+ * Determines if an exception is a resumable error.
196
+ *
197
+ * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
198
+ * @param RuntimeException $exception
199
+ * @return boolean
200
+ */
201
+ private function isResumableError (RuntimeException $ exception )
202
+ {
203
+ if ($ exception instanceof ConnectionException) {
204
+ return true ;
205
+ }
206
+
207
+ if ( ! $ exception instanceof ServerException) {
208
+ return false ;
209
+ }
210
+
211
+ if (in_array ($ exception ->getCode (), [self ::$ errorCodeCappedPositionLost , self ::$ errorCodeCursorKilled , self ::$ errorCodeInterrupted ])) {
212
+ return false ;
213
+ }
214
+
215
+ return true ;
216
+ }
217
+
204
218
/**
205
219
* Creates a new changeStream after a resumable server error.
206
220
*
0 commit comments