22
22
import rx .Observable .Operator ;
23
23
import rx .Producer ;
24
24
import rx .Subscriber ;
25
+ import rx .internal .producers .ProducerArbiter ;
25
26
import rx .observers .SerializedSubscriber ;
26
27
import rx .subscriptions .SerialSubscription ;
27
28
@@ -46,7 +47,9 @@ private static final class Holder {
46
47
public static <T > OperatorSwitch <T > instance () {
47
48
return (OperatorSwitch <T >)Holder .INSTANCE ;
48
49
}
50
+
49
51
private OperatorSwitch () { }
52
+
50
53
@ Override
51
54
public Subscriber <? super Observable <? extends T >> call (final Subscriber <? super T > child ) {
52
55
SwitchSubscriber <T > sws = new SwitchSubscriber <T >(child );
@@ -55,10 +58,12 @@ public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super
55
58
}
56
59
57
60
private static final class SwitchSubscriber <T > extends Subscriber <Observable <? extends T >> {
58
- final SerializedSubscriber <T > s ;
61
+ final SerializedSubscriber <T > serializedChild ;
59
62
final SerialSubscription ssub ;
60
63
final Object guard = new Object ();
61
64
final NotificationLite <?> nl = NotificationLite .instance ();
65
+ final ProducerArbiter arbiter ;
66
+
62
67
/** Guarded by guard. */
63
68
int index ;
64
69
/** Guarded by guard. */
@@ -70,50 +75,19 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
70
75
/** Guarded by guard. */
71
76
boolean emitting ;
72
77
/** Guarded by guard. */
73
- InnerSubscriber currentSubscriber ;
74
- /** Guarded by guard. */
75
- long initialRequested ;
76
-
77
- volatile boolean infinite = false ;
78
+ InnerSubscriber <T > currentSubscriber ;
78
79
79
- public SwitchSubscriber (Subscriber <? super T > child ) {
80
- s = new SerializedSubscriber <T >(child );
80
+ SwitchSubscriber (Subscriber <? super T > child ) {
81
+ serializedChild = new SerializedSubscriber <T >(child );
82
+ arbiter = new ProducerArbiter ();
81
83
ssub = new SerialSubscription ();
82
84
child .add (ssub );
83
85
child .setProducer (new Producer (){
84
86
85
87
@ Override
86
88
public void request (long n ) {
87
- if (infinite ) {
88
- return ;
89
- }
90
- if (n == Long .MAX_VALUE ) {
91
- infinite = true ;
92
- }
93
- InnerSubscriber localSubscriber ;
94
- synchronized (guard ) {
95
- localSubscriber = currentSubscriber ;
96
- if (currentSubscriber == null ) {
97
- long r = initialRequested + n ;
98
- if (r < 0 ) {
99
- infinite = true ;
100
- } else {
101
- initialRequested = r ;
102
- }
103
- } else {
104
- long r = currentSubscriber .requested + n ;
105
- if (r < 0 ) {
106
- infinite = true ;
107
- } else {
108
- currentSubscriber .requested = r ;
109
- }
110
- }
111
- }
112
- if (localSubscriber != null ) {
113
- if (infinite )
114
- localSubscriber .requestMore (Long .MAX_VALUE );
115
- else
116
- localSubscriber .requestMore (n );
89
+ if (n > 0 ) {
90
+ arbiter .request (n );
117
91
}
118
92
}
119
93
});
@@ -122,26 +96,18 @@ public void request(long n) {
122
96
@ Override
123
97
public void onNext (Observable <? extends T > t ) {
124
98
final int id ;
125
- long remainingRequest ;
126
99
synchronized (guard ) {
127
100
id = ++index ;
128
101
active = true ;
129
- if (infinite ) {
130
- remainingRequest = Long .MAX_VALUE ;
131
- } else {
132
- remainingRequest = currentSubscriber == null ? initialRequested : currentSubscriber .requested ;
133
- }
134
- currentSubscriber = new InnerSubscriber (id , remainingRequest );
135
- currentSubscriber .requested = remainingRequest ;
102
+ currentSubscriber = new InnerSubscriber <T >(id , arbiter , this );
136
103
}
137
104
ssub .set (currentSubscriber );
138
-
139
105
t .unsafeSubscribe (currentSubscriber );
140
106
}
141
107
142
108
@ Override
143
109
public void onError (Throwable e ) {
144
- s .onError (e );
110
+ serializedChild .onError (e );
145
111
unsubscribe ();
146
112
}
147
113
@@ -165,10 +131,10 @@ public void onCompleted() {
165
131
emitting = true ;
166
132
}
167
133
drain (localQueue );
168
- s .onCompleted ();
134
+ serializedChild .onCompleted ();
169
135
unsubscribe ();
170
136
}
171
- void emit (T value , int id , InnerSubscriber innerSubscriber ) {
137
+ void emit (T value , int id , InnerSubscriber < T > innerSubscriber ) {
172
138
List <Object > localQueue ;
173
139
synchronized (guard ) {
174
140
if (id != index ) {
@@ -178,8 +144,6 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
178
144
if (queue == null ) {
179
145
queue = new ArrayList <Object >();
180
146
}
181
- if (innerSubscriber .requested != Long .MAX_VALUE )
182
- innerSubscriber .requested --;
183
147
queue .add (value );
184
148
return ;
185
149
}
@@ -194,11 +158,8 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
194
158
drain (localQueue );
195
159
if (once ) {
196
160
once = false ;
197
- synchronized (guard ) {
198
- if (innerSubscriber .requested != Long .MAX_VALUE )
199
- innerSubscriber .requested --;
200
- }
201
- s .onNext (value );
161
+ serializedChild .onNext (value );
162
+ arbiter .produced (1 );
202
163
}
203
164
synchronized (guard ) {
204
165
localQueue = queue ;
@@ -209,7 +170,7 @@ void emit(T value, int id, InnerSubscriber innerSubscriber) {
209
170
break ;
210
171
}
211
172
}
212
- } while (!s .isUnsubscribed ());
173
+ } while (!serializedChild .isUnsubscribed ());
213
174
} finally {
214
175
if (!skipFinal ) {
215
176
synchronized (guard ) {
@@ -224,16 +185,17 @@ void drain(List<Object> localQueue) {
224
185
}
225
186
for (Object o : localQueue ) {
226
187
if (nl .isCompleted (o )) {
227
- s .onCompleted ();
188
+ serializedChild .onCompleted ();
228
189
break ;
229
190
} else
230
191
if (nl .isError (o )) {
231
- s .onError (nl .getError (o ));
192
+ serializedChild .onError (nl .getError (o ));
232
193
break ;
233
194
} else {
234
195
@ SuppressWarnings ("unchecked" )
235
196
T t = (T )o ;
236
- s .onNext (t );
197
+ serializedChild .onNext (t );
198
+ arbiter .produced (1 );
237
199
}
238
200
}
239
201
}
@@ -258,7 +220,7 @@ void error(Throwable e, int id) {
258
220
}
259
221
260
222
drain (localQueue );
261
- s .onError (e );
223
+ serializedChild .onError (e );
262
224
unsubscribe ();
263
225
}
264
226
void complete (int id ) {
@@ -285,51 +247,45 @@ void complete(int id) {
285
247
}
286
248
287
249
drain (localQueue );
288
- s .onCompleted ();
250
+ serializedChild .onCompleted ();
289
251
unsubscribe ();
290
252
}
291
253
292
- final class InnerSubscriber extends Subscriber <T > {
293
-
294
- /**
295
- * The number of request that is not acknowledged.
296
- *
297
- * Guarded by guard.
298
- */
299
- private long requested = 0 ;
300
-
301
- private final int id ;
254
+ }
255
+
256
+ private static final class InnerSubscriber <T > extends Subscriber <T > {
302
257
303
- private final long initialRequested ;
258
+ private final int id ;
304
259
305
- public InnerSubscriber (int id , long initialRequested ) {
306
- this .id = id ;
307
- this .initialRequested = initialRequested ;
308
- }
260
+ private final ProducerArbiter arbiter ;
309
261
310
- @ Override
311
- public void onStart () {
312
- requestMore (initialRequested );
313
- }
262
+ private final SwitchSubscriber <T > parent ;
314
263
315
- public void requestMore (long n ) {
316
- request (n );
317
- }
264
+ InnerSubscriber (int id , ProducerArbiter arbiter , SwitchSubscriber <T > parent ) {
265
+ this .id = id ;
266
+ this .arbiter = arbiter ;
267
+ this .parent = parent ;
268
+ }
269
+
270
+ @ Override
271
+ public void setProducer (Producer p ) {
272
+ arbiter .setProducer (p );
273
+ }
318
274
319
- @ Override
320
- public void onNext (T t ) {
321
- emit (t , id , this );
322
- }
275
+ @ Override
276
+ public void onNext (T t ) {
277
+ parent . emit (t , id , this );
278
+ }
323
279
324
- @ Override
325
- public void onError (Throwable e ) {
326
- error (e , id );
327
- }
280
+ @ Override
281
+ public void onError (Throwable e ) {
282
+ parent . error (e , id );
283
+ }
328
284
329
- @ Override
330
- public void onCompleted () {
331
- complete (id );
332
- }
285
+ @ Override
286
+ public void onCompleted () {
287
+ parent .complete (id );
333
288
}
334
289
}
290
+
335
291
}
0 commit comments