16
16
package rx .observers ;
17
17
18
18
import rx .Observer ;
19
- import rx .exceptions .Exceptions ;
19
+ import rx .exceptions .*;
20
+ import rx .internal .operators .NotificationLite ;
20
21
21
22
/**
22
23
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
35
36
public class SerializedObserver <T > implements Observer <T > {
36
37
private final Observer <? super T > actual ;
37
38
38
- private boolean emitting = false ;
39
- private boolean terminated = false ;
39
+ private boolean emitting ;
40
+ /** Set to true if a terminal event was received. */
41
+ private volatile boolean terminated ;
42
+ /** If not null, it indicates more work. */
40
43
private FastList queue ;
44
+ private final NotificationLite <T > nl = NotificationLite .instance ();
41
45
42
- private static final int MAX_DRAIN_ITERATION = Integer .MAX_VALUE ;
43
- private static final Object NULL_SENTINEL = new Object ();
44
- private static final Object COMPLETE_SENTINEL = new Object ();
46
+ /** Number of iterations without additional safepoint poll in the drain loop. */
47
+ private static final int MAX_DRAIN_ITERATION = 1024 ;
45
48
46
49
static final class FastList {
47
50
Object [] array ;
@@ -64,150 +67,119 @@ public void add(Object o) {
64
67
}
65
68
}
66
69
67
- private static final class ErrorSentinel {
68
- final Throwable e ;
69
-
70
- ErrorSentinel (Throwable e ) {
71
- this .e = e ;
72
- }
73
- }
74
-
75
70
public SerializedObserver (Observer <? super T > s ) {
76
71
this .actual = s ;
77
72
}
78
73
79
74
@ Override
80
- public void onCompleted () {
81
- FastList list ;
75
+ public void onNext (T t ) {
76
+ if (terminated ) {
77
+ return ;
78
+ }
82
79
synchronized (this ) {
83
80
if (terminated ) {
84
81
return ;
85
82
}
86
- terminated = true ;
87
83
if (emitting ) {
88
- if (queue == null ) {
89
- queue = new FastList ();
84
+ FastList list = queue ;
85
+ if (list == null ) {
86
+ list = new FastList ();
87
+ queue = list ;
90
88
}
91
- queue .add (COMPLETE_SENTINEL );
89
+ list .add (nl . next ( t ) );
92
90
return ;
93
91
}
94
92
emitting = true ;
95
- list = queue ;
96
- queue = null ;
97
93
}
98
- drainQueue (list );
99
- actual .onCompleted ();
94
+ try {
95
+ actual .onNext (t );
96
+ } catch (Throwable e ) {
97
+ terminated = true ;
98
+ Exceptions .throwIfFatal (e );
99
+ actual .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
100
+ return ;
101
+ }
102
+ for (;;) {
103
+ for (int i = 0 ; i < MAX_DRAIN_ITERATION ; i ++) {
104
+ FastList list ;
105
+ synchronized (this ) {
106
+ list = queue ;
107
+ if (list == null ) {
108
+ emitting = false ;
109
+ return ;
110
+ }
111
+ queue = null ;
112
+ }
113
+ for (Object o : list .array ) {
114
+ if (o == null ) {
115
+ break ;
116
+ }
117
+ try {
118
+ if (nl .accept (actual , o )) {
119
+ terminated = true ;
120
+ return ;
121
+ }
122
+ } catch (Throwable e ) {
123
+ terminated = true ;
124
+ Exceptions .throwIfFatal (e );
125
+ actual .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
126
+ return ;
127
+ }
128
+ }
129
+ }
130
+ }
100
131
}
101
-
132
+
102
133
@ Override
103
134
public void onError (final Throwable e ) {
104
135
Exceptions .throwIfFatal (e );
105
- FastList list ;
136
+ if (terminated ) {
137
+ return ;
138
+ }
106
139
synchronized (this ) {
107
140
if (terminated ) {
108
141
return ;
109
142
}
143
+ terminated = true ;
110
144
if (emitting ) {
111
- if (queue == null ) {
112
- queue = new FastList ();
145
+ /*
146
+ * FIXME: generally, errors jump the queue but this wasn't true
147
+ * for SerializedObserver and may break existing expectations.
148
+ */
149
+ FastList list = queue ;
150
+ if (list == null ) {
151
+ list = new FastList ();
152
+ queue = list ;
113
153
}
114
- queue .add (new ErrorSentinel (e ));
154
+ list .add (nl . error (e ));
115
155
return ;
116
156
}
117
157
emitting = true ;
118
- list = queue ;
119
- queue = null ;
120
158
}
121
- drainQueue (list );
122
159
actual .onError (e );
123
- synchronized (this ) {
124
- emitting = false ;
125
- }
126
160
}
127
161
128
162
@ Override
129
- public void onNext (T t ) {
130
- FastList list ;
131
-
163
+ public void onCompleted () {
164
+ if (terminated ) {
165
+ return ;
166
+ }
132
167
synchronized (this ) {
133
168
if (terminated ) {
134
169
return ;
135
170
}
171
+ terminated = true ;
136
172
if (emitting ) {
137
- if (queue == null ) {
138
- queue = new FastList ();
173
+ FastList list = queue ;
174
+ if (list == null ) {
175
+ list = new FastList ();
176
+ queue = list ;
139
177
}
140
- queue .add (t != null ? t : NULL_SENTINEL );
141
- // another thread is emitting so we add to the queue and return
178
+ list .add (nl .completed ());
142
179
return ;
143
180
}
144
- // we can emit
145
181
emitting = true ;
146
- // reference to the list to drain before emitting our value
147
- list = queue ;
148
- queue = null ;
149
- }
150
-
151
- // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
152
- boolean skipFinal = false ;
153
- try {
154
- int iter = MAX_DRAIN_ITERATION ;
155
- do {
156
- drainQueue (list );
157
- if (iter == MAX_DRAIN_ITERATION ) {
158
- // after the first draining we emit our own value
159
- actual .onNext (t );
160
- }
161
- --iter ;
162
- if (iter > 0 ) {
163
- synchronized (this ) {
164
- list = queue ;
165
- queue = null ;
166
- if (list == null ) {
167
- emitting = false ;
168
- skipFinal = true ;
169
- return ;
170
- }
171
- }
172
- }
173
- } while (iter > 0 );
174
- } finally {
175
- if (!skipFinal ) {
176
- synchronized (this ) {
177
- if (terminated ) {
178
- list = queue ;
179
- queue = null ;
180
- } else {
181
- emitting = false ;
182
- list = null ;
183
- }
184
- }
185
- }
186
- }
187
-
188
- // this will only drain if terminated (done here outside of synchronized block)
189
- drainQueue (list );
190
- }
191
-
192
- void drainQueue (FastList list ) {
193
- if (list == null || list .size == 0 ) {
194
- return ;
195
- }
196
- for (Object v : list .array ) {
197
- if (v == null ) {
198
- break ;
199
- }
200
- if (v == NULL_SENTINEL ) {
201
- actual .onNext (null );
202
- } else if (v == COMPLETE_SENTINEL ) {
203
- actual .onCompleted ();
204
- } else if (v .getClass () == ErrorSentinel .class ) {
205
- actual .onError (((ErrorSentinel ) v ).e );
206
- } else {
207
- @ SuppressWarnings ("unchecked" )
208
- T t = (T )v ;
209
- actual .onNext (t );
210
- }
211
182
}
183
+ actual .onCompleted ();
212
184
}
213
185
}
0 commit comments