15
15
*/
16
16
package rx .internal .operators ;
17
17
18
- import java .util .LinkedList ;
19
- import java .util .Queue ;
20
- import java . util . concurrent . atomic . AtomicIntegerFieldUpdater ;
21
- import rx .Observable ;
18
+ import java .util .* ;
19
+ import java .util .concurrent . atomic .* ;
20
+
21
+ import rx .* ;
22
22
import rx .Observable .Operator ;
23
- import rx .Subscriber ;
23
+ import rx .Observable ;
24
+ import rx .exceptions .MissingBackpressureException ;
25
+ import rx .internal .util .RxRingBuffer ;
24
26
import rx .observers .SerializedSubscriber ;
25
27
import rx .subscriptions .CompositeSubscription ;
26
28
@@ -47,34 +49,75 @@ public Subscriber<? super Observable<? extends T>> call(Subscriber<? super T> ch
47
49
final CompositeSubscription csub = new CompositeSubscription ();
48
50
child .add (csub );
49
51
50
- return new SourceSubscriber <T >(maxConcurrency , s , csub );
52
+ SourceSubscriber <T > ssub = new SourceSubscriber <T >(maxConcurrency , s , csub );
53
+ child .setProducer (new MergeMaxConcurrentProducer <T >(ssub ));
54
+
55
+ return ssub ;
56
+ }
57
+ /** Routes the requests from downstream to the sourcesubscriber. */
58
+ static final class MergeMaxConcurrentProducer <T > implements Producer {
59
+ final SourceSubscriber <T > ssub ;
60
+ public MergeMaxConcurrentProducer (SourceSubscriber <T > ssub ) {
61
+ this .ssub = ssub ;
62
+ }
63
+ @ Override
64
+ public void request (long n ) {
65
+ ssub .downstreamRequest (n );
66
+ }
51
67
}
52
68
static final class SourceSubscriber <T > extends Subscriber <Observable <? extends T >> {
69
+ final NotificationLite <T > nl = NotificationLite .instance ();
53
70
final int maxConcurrency ;
54
71
final Subscriber <T > s ;
55
72
final CompositeSubscription csub ;
56
73
final Object guard ;
57
74
58
75
volatile int wip ;
59
76
@ SuppressWarnings ("rawtypes" )
60
- static final AtomicIntegerFieldUpdater <SourceSubscriber > WIP_UPDATER
77
+ static final AtomicIntegerFieldUpdater <SourceSubscriber > WIP
61
78
= AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "wip" );
79
+ volatile int sourceIndex ;
80
+ @ SuppressWarnings ("rawtypes" )
81
+ static final AtomicIntegerFieldUpdater <SourceSubscriber > SOURCE_INDEX
82
+ = AtomicIntegerFieldUpdater .newUpdater (SourceSubscriber .class , "sourceIndex" );
62
83
63
84
/** Guarded by guard. */
64
85
int active ;
65
86
/** Guarded by guard. */
66
87
final Queue <Observable <? extends T >> queue ;
67
88
89
+ /** Indicates the emitting phase. Guarded by this. */
90
+ boolean emitting ;
91
+ /** Counts the missed emitting calls. Guarded by this. */
92
+ int missedEmitting ;
93
+ /** The last buffer index in the round-robin drain scheme. Accessed while emitting == true. */
94
+ int lastIndex ;
95
+
96
+ /** Guarded by itself. */
97
+ final List <MergeItemSubscriber > subscribers ;
98
+
99
+ volatile long requested ;
100
+ @ SuppressWarnings ("rawtypes" )
101
+ static final AtomicLongFieldUpdater <SourceSubscriber > REQUESTED
102
+ = AtomicLongFieldUpdater .newUpdater (SourceSubscriber .class , "requested" );
103
+
104
+
68
105
public SourceSubscriber (int maxConcurrency , Subscriber <T > s , CompositeSubscription csub ) {
69
106
super (s );
70
107
this .maxConcurrency = maxConcurrency ;
71
108
this .s = s ;
72
109
this .csub = csub ;
73
110
this .guard = new Object ();
74
- this .queue = new LinkedList <Observable <? extends T >>();
111
+ this .queue = new ArrayDeque <Observable <? extends T >>(maxConcurrency );
112
+ this .subscribers = Collections .synchronizedList (new ArrayList <MergeItemSubscriber >());
75
113
this .wip = 1 ;
76
114
}
77
115
116
+ @ Override
117
+ public void onStart () {
118
+ request (maxConcurrency );
119
+ }
120
+
78
121
@ Override
79
122
public void onNext (Observable <? extends T > t ) {
80
123
synchronized (guard ) {
@@ -94,50 +137,213 @@ void subscribeNext() {
94
137
queue .poll ();
95
138
}
96
139
97
- Subscriber <T > itemSub = new Subscriber <T >() {
98
- boolean once = true ;
99
- @ Override
100
- public void onNext (T t ) {
101
- s .onNext (t );
102
- }
103
-
104
- @ Override
105
- public void onError (Throwable e ) {
106
- SourceSubscriber .this .onError (e );
107
- }
108
-
109
- @ Override
110
- public void onCompleted () {
111
- if (once ) {
112
- once = false ;
113
- synchronized (guard ) {
114
- active --;
115
- }
116
- csub .remove (this );
117
-
118
- subscribeNext ();
119
-
120
- SourceSubscriber .this .onCompleted ();
121
- }
122
- }
123
-
124
- };
140
+ MergeItemSubscriber itemSub = new MergeItemSubscriber (SOURCE_INDEX .getAndIncrement (this ));
141
+ subscribers .add (itemSub );
142
+
125
143
csub .add (itemSub );
126
- WIP_UPDATER .incrementAndGet (this );
144
+
145
+ WIP .incrementAndGet (this );
127
146
128
147
t .unsafeSubscribe (itemSub );
148
+
149
+ request (1 );
129
150
}
130
151
131
152
@ Override
132
153
public void onError (Throwable e ) {
133
- s .onError (e );
134
- unsubscribe ();
154
+ Object [] active ;
155
+ synchronized (subscribers ) {
156
+ active = subscribers .toArray ();
157
+ subscribers .clear ();
158
+ }
159
+
160
+ try {
161
+ s .onError (e );
162
+
163
+ unsubscribe ();
164
+ } finally {
165
+ for (Object o : active ) {
166
+ @ SuppressWarnings ("unchecked" )
167
+ MergeItemSubscriber a = (MergeItemSubscriber )o ;
168
+ a .release ();
169
+ }
170
+ }
171
+
135
172
}
136
173
137
174
@ Override
138
175
public void onCompleted () {
139
- if (WIP_UPDATER .decrementAndGet (this ) == 0 ) {
140
- s .onCompleted ();
176
+ WIP .decrementAndGet (this );
177
+ drain ();
178
+ }
179
+
180
+ protected void downstreamRequest (long n ) {
181
+ for (;;) {
182
+ long r = requested ;
183
+ long u ;
184
+ if (r != Long .MAX_VALUE && n == Long .MAX_VALUE ) {
185
+ u = Long .MAX_VALUE ;
186
+ } else
187
+ if (r + n < 0 ) {
188
+ u = Long .MAX_VALUE ;
189
+ } else {
190
+ u = r + n ;
191
+ }
192
+ if (REQUESTED .compareAndSet (this , r , u )) {
193
+ break ;
194
+ }
195
+ }
196
+ drain ();
197
+ }
198
+
199
+ protected void drain () {
200
+ synchronized (this ) {
201
+ if (emitting ) {
202
+ missedEmitting ++;
203
+ return ;
204
+ }
205
+ emitting = true ;
206
+ missedEmitting = 0 ;
207
+ }
208
+ final List <SourceSubscriber <T >.MergeItemSubscriber > subs = subscribers ;
209
+ final Subscriber <T > child = s ;
210
+ Object [] active = new Object [subs .size ()];
211
+ do {
212
+ long r ;
213
+
214
+ outer :
215
+ while ((r = requested ) > 0 ) {
216
+ int idx = lastIndex ;
217
+ synchronized (subs ) {
218
+ if (subs .size () == active .length ) {
219
+ active = subs .toArray (active );
220
+ } else {
221
+ active = subs .toArray ();
222
+ }
223
+ }
224
+
225
+ int resumeIndex = 0 ;
226
+ int j = 0 ;
227
+ for (Object o : active ) {
228
+ @ SuppressWarnings ("unchecked" )
229
+ MergeItemSubscriber e = (MergeItemSubscriber )o ;
230
+ if (e .index == idx ) {
231
+ resumeIndex = j ;
232
+ break ;
233
+ }
234
+ j ++;
235
+ }
236
+ int sumConsumed = 0 ;
237
+ for (int i = 0 ; i < active .length ; i ++) {
238
+ j = (i + resumeIndex ) % active .length ;
239
+
240
+ @ SuppressWarnings ("unchecked" )
241
+ final MergeItemSubscriber e = (MergeItemSubscriber )active [j ];
242
+ final RxRingBuffer b = e .buffer ;
243
+ lastIndex = e .index ;
244
+
245
+ if (!e .once && b .peek () == null ) {
246
+ subs .remove (e );
247
+
248
+ synchronized (guard ) {
249
+ this .active --;
250
+ }
251
+ csub .remove (e );
252
+
253
+ e .release ();
254
+
255
+ subscribeNext ();
256
+
257
+ WIP .decrementAndGet (this );
258
+
259
+ continue outer ;
260
+ }
261
+
262
+ int consumed = 0 ;
263
+ Object v ;
264
+ while (r > 0 && (v = b .poll ()) != null ) {
265
+ nl .accept (child , v );
266
+ if (child .isUnsubscribed ()) {
267
+ return ;
268
+ }
269
+ r --;
270
+ consumed ++;
271
+ }
272
+ if (consumed > 0 ) {
273
+ sumConsumed += consumed ;
274
+ REQUESTED .addAndGet (this , -consumed );
275
+ e .requestMore (consumed );
276
+ }
277
+ if (r == 0 ) {
278
+ break outer ;
279
+ }
280
+ }
281
+ if (sumConsumed == 0 ) {
282
+ break ;
283
+ }
284
+ }
285
+
286
+ if (active .length == 0 ) {
287
+ if (wip == 0 ) {
288
+ child .onCompleted ();
289
+ return ;
290
+ }
291
+ }
292
+ synchronized (this ) {
293
+ if (missedEmitting == 0 ) {
294
+ emitting = false ;
295
+ break ;
296
+ }
297
+ missedEmitting = 0 ;
298
+ }
299
+ } while (true );
300
+ }
301
+ final class MergeItemSubscriber extends Subscriber <T > {
302
+ volatile boolean once = true ;
303
+ final int index ;
304
+ final RxRingBuffer buffer ;
305
+
306
+ public MergeItemSubscriber (int index ) {
307
+ buffer = RxRingBuffer .getSpmcInstance ();
308
+ this .index = index ;
309
+ }
310
+
311
+ @ Override
312
+ public void onStart () {
313
+ request (RxRingBuffer .SIZE );
314
+ }
315
+
316
+ @ Override
317
+ public void onNext (T t ) {
318
+ try {
319
+ buffer .onNext (t );
320
+ } catch (MissingBackpressureException ex ) {
321
+ onError (ex );
322
+ return ;
323
+ }
324
+
325
+ drain ();
326
+ }
327
+
328
+ @ Override
329
+ public void onError (Throwable e ) {
330
+ SourceSubscriber .this .onError (e );
331
+ }
332
+
333
+ @ Override
334
+ public void onCompleted () {
335
+ if (once ) {
336
+ once = false ;
337
+ drain ();
338
+ }
339
+ }
340
+ /** Request more from upstream. */
341
+ void requestMore (long n ) {
342
+ request (n );
343
+ }
344
+ void release () {
345
+ // NO-OP for now
346
+ buffer .release ();
141
347
}
142
348
}
143
349
}
0 commit comments