21
21
import org .reactivestreams .*;
22
22
23
23
import io .reactivex .Observable .Operator ;
24
- import io .reactivex .internal .queue .SpscLinkedArrayQueue ;
24
+ import io .reactivex .exceptions .MissingBackpressureException ;
25
+ import io .reactivex .internal .queue .*;
25
26
import io .reactivex .internal .subscriptions .*;
26
- import io .reactivex .internal .util .BackpressureHelper ;
27
+ import io .reactivex .internal .util .* ;
27
28
import io .reactivex .observables .GroupedObservable ;
29
+ import io .reactivex .plugins .RxJavaPlugins ;
28
30
29
31
public final class OperatorGroupBy <T , K , V > implements Operator <GroupedObservable <K , V >, T >{
30
32
final Function <? super T , ? extends K > keySelector ;
@@ -44,7 +46,9 @@ public Subscriber<? super T> apply(Subscriber<? super GroupedObservable<K, V>> t
44
46
return new GroupBySubscriber <>(t , keySelector , valueSelector , bufferSize , delayError );
45
47
}
46
48
47
- public static final class GroupBySubscriber <T , K , V > extends AtomicInteger implements Subscriber <T >, Subscription {
49
+ public static final class GroupBySubscriber <T , K , V >
50
+ extends AtomicInteger
51
+ implements Subscriber <T >, Subscription {
48
52
/** */
49
53
private static final long serialVersionUID = -3688291656102519502L ;
50
54
@@ -54,6 +58,7 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
54
58
final int bufferSize ;
55
59
final boolean delayError ;
56
60
final Map <Object , GroupedUnicast <K , V >> groups ;
61
+ final Queue <T > queue ;
57
62
58
63
static final Object NULL_KEY = new Object ();
59
64
@@ -64,14 +69,28 @@ public static final class GroupBySubscriber<T, K, V> extends AtomicInteger imple
64
69
static final AtomicIntegerFieldUpdater <GroupBySubscriber > CANCELLED =
65
70
AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "cancelled" );
66
71
72
+ volatile long requested ;
73
+ @ SuppressWarnings ("rawtypes" )
74
+ static final AtomicLongFieldUpdater <GroupBySubscriber > REQUESTED =
75
+ AtomicLongFieldUpdater .newUpdater (GroupBySubscriber .class , "requested" );
76
+
77
+ volatile int groupCount ;
78
+ @ SuppressWarnings ("rawtypes" )
79
+ static final AtomicIntegerFieldUpdater <GroupBySubscriber > GROUP_COUNT =
80
+ AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "groupCount" );
81
+
82
+ Throwable error ;
83
+ volatile boolean done ;
84
+
67
85
public GroupBySubscriber (Subscriber <? super GroupedObservable <K , V >> actual , Function <? super T , ? extends K > keySelector , Function <? super T , ? extends V > valueSelector , int bufferSize , boolean delayError ) {
68
86
this .actual = actual ;
69
87
this .keySelector = keySelector ;
70
88
this .valueSelector = valueSelector ;
71
89
this .bufferSize = bufferSize ;
72
90
this .delayError = delayError ;
73
91
this .groups = new ConcurrentHashMap <>();
74
- this .lazySet (1 );
92
+ this .queue = new SpscLinkedArrayQueue <>(bufferSize );
93
+ GROUP_COUNT .lazySet (this , 1 );
75
94
}
76
95
77
96
@ Override
@@ -82,90 +101,60 @@ public void onSubscribe(Subscription s) {
82
101
83
102
this .s = s ;
84
103
actual .onSubscribe (this );
104
+ s .request (bufferSize );
85
105
}
86
106
87
107
@ Override
88
108
public void onNext (T t ) {
89
- K key ;
90
- try {
91
- key = keySelector .apply (t );
92
- } catch (Throwable e ) {
93
- s .cancel ();
94
- onError (e );
109
+ if (done ) {
95
110
return ;
96
111
}
97
-
98
- boolean notNew = true ;
99
- Object mapKey = key != null ? key : NULL_KEY ;
100
- GroupedUnicast <K , V > group = groups .get (mapKey );
101
- if (group == null ) {
102
- // if the main has been cancelled, stop creating groups
103
- // and skip this value
104
- if (cancelled != 0 ) {
105
- s .request (1 );
106
- return ;
107
- }
108
- notNew = true ;
109
-
110
- group = GroupedUnicast .createWith (key , bufferSize , this , delayError );
111
- groups .put (mapKey , group );
112
-
113
- getAndIncrement ();
114
-
115
- actual .onNext (group );
116
- }
117
-
118
- V v ;
119
- try {
120
- v = valueSelector .apply (t );
121
- } catch (Throwable e ) {
112
+ if (!queue .offer (t )) {
122
113
s .cancel ();
123
- onError (e );
114
+ onError (new MissingBackpressureException ( "Queue full?!" ) );
124
115
return ;
125
116
}
126
-
127
- group .onNext (v );
128
-
129
- if (notNew ) {
130
- s .request (1 ); // we spent this t on an existing group, request one more
131
- }
117
+ drain ();
132
118
}
133
119
134
120
@ Override
135
121
public void onError (Throwable t ) {
136
- List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
137
- groups .clear ();
138
-
139
- for (GroupedUnicast <K , V > e : list ) {
140
- e .onError (t );
122
+ if (done ) {
123
+ RxJavaPlugins .onError (t );
124
+ return ;
141
125
}
142
-
143
- actual .onError (t );
126
+ error = t ;
127
+ done = true ;
128
+ GROUP_COUNT .decrementAndGet (this );
129
+ drain ();
144
130
}
145
131
146
132
@ Override
147
133
public void onComplete () {
148
- List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
149
- groups .clear ();
150
-
151
- for (GroupedUnicast <K , V > e : list ) {
152
- e .onComplete ();
134
+ if (done ) {
135
+ return ;
153
136
}
154
-
155
- actual .onComplete ();
137
+ done = true ;
138
+ GROUP_COUNT .decrementAndGet (this );
139
+ drain ();
156
140
}
157
141
158
142
@ Override
159
143
public void request (long n ) {
160
- s .request (n );
144
+ if (SubscriptionHelper .validateRequest (n )) {
145
+ return ;
146
+ }
147
+
148
+ BackpressureHelper .add (REQUESTED , this , n );
149
+ drain ();
161
150
}
162
151
163
152
@ Override
164
153
public void cancel () {
165
154
// cancelling the main source means we don't want any more groups
166
155
// but running groups still require new values
167
156
if (CANCELLED .compareAndSet (this , 0 , 1 )) {
168
- if (decrementAndGet () == 0 ) {
157
+ if (GROUP_COUNT . decrementAndGet (this ) == 0 ) {
169
158
s .cancel ();
170
159
}
171
160
}
@@ -174,10 +163,142 @@ public void cancel() {
174
163
public void cancel (K key ) {
175
164
Object mapKey = key != null ? key : NULL_KEY ;
176
165
groups .remove (mapKey );
177
- if (decrementAndGet () == 0 ) {
166
+ if (GROUP_COUNT . decrementAndGet (this ) == 0 ) {
178
167
s .cancel ();
179
168
}
180
169
}
170
+
171
+ void drain () {
172
+ if (getAndIncrement () != 0 ) {
173
+ return ;
174
+ }
175
+
176
+ int missed = 1 ;
177
+
178
+ final Queue <T > q = this .queue ;
179
+ final Subscriber <? super GroupedObservable <K , V >> a = this .actual ;
180
+
181
+ for (;;) {
182
+
183
+ if (checkTerminated (done , q .isEmpty (), a , q )) {
184
+ return ;
185
+ }
186
+
187
+ long r = requested ;
188
+ boolean unbounded = r == Long .MAX_VALUE ;
189
+ long e = 0L ;
190
+
191
+ while (r != 0 ) {
192
+ boolean d = done ;
193
+
194
+ T t = q .poll ();
195
+
196
+ boolean empty = t == null ;
197
+
198
+ if (checkTerminated (d , empty , a , q )) {
199
+ return ;
200
+ }
201
+
202
+ if (empty ) {
203
+ break ;
204
+ }
205
+
206
+ K key ;
207
+ try {
208
+ key = keySelector .apply (t );
209
+ } catch (Throwable ex ) {
210
+ s .cancel ();
211
+ errorAll (a , q , ex );
212
+ return ;
213
+ }
214
+
215
+ boolean notNew = true ;
216
+ Object mapKey = key != null ? key : NULL_KEY ;
217
+ GroupedUnicast <K , V > group = groups .get (mapKey );
218
+ if (group == null ) {
219
+ // if the main has been cancelled, stop creating groups
220
+ // and skip this value
221
+ if (cancelled == 0 ) {
222
+ group = GroupedUnicast .createWith (key , bufferSize , this , delayError );
223
+ groups .put (mapKey , group );
224
+
225
+ GROUP_COUNT .getAndIncrement (this );
226
+
227
+ actual .onNext (group );
228
+
229
+ notNew = false ;
230
+ } else {
231
+ continue ;
232
+ }
233
+ }
234
+
235
+ V v ;
236
+ try {
237
+ v = valueSelector .apply (t );
238
+ } catch (Throwable ex ) {
239
+ s .cancel ();
240
+ errorAll (a , q , ex );
241
+ return ;
242
+ }
243
+
244
+ group .onNext (v );
245
+
246
+ if (notNew ) {
247
+ s .request (1 ); // we spent this t on an existing group, request one more
248
+ } else {
249
+ r --;
250
+ e --;
251
+ }
252
+ }
253
+
254
+ if (e != 0L ) {
255
+ if (!unbounded ) {
256
+ REQUESTED .addAndGet (this , e );
257
+ }
258
+ s .request (-e );
259
+ }
260
+
261
+ missed = addAndGet (-missed );
262
+ if (missed == 0 ) {
263
+ break ;
264
+ }
265
+ }
266
+ }
267
+
268
+ void errorAll (Subscriber <? super GroupedObservable <K , V >> a , Queue <T > q , Throwable ex ) {
269
+ q .clear ();
270
+ List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
271
+ groups .clear ();
272
+
273
+ for (GroupedUnicast <K , V > e : list ) {
274
+ e .onError (ex );
275
+ }
276
+
277
+ a .onError (ex );
278
+ }
279
+
280
+ boolean checkTerminated (boolean d , boolean empty ,
281
+ Subscriber <? super GroupedObservable <K , V >> a , Queue <T > q ) {
282
+ if (d ) {
283
+ Throwable err = error ;
284
+ if (err != null ) {
285
+ errorAll (a , q , err );
286
+ return true ;
287
+ } else
288
+ if (empty ) {
289
+ List <GroupedUnicast <K , V >> list = new ArrayList <>(groups .values ());
290
+ groups .clear ();
291
+
292
+ for (GroupedUnicast <K , V > e : list ) {
293
+ e .onComplete ();
294
+ }
295
+
296
+ actual .onComplete ();
297
+ return true ;
298
+ }
299
+ }
300
+ return false ;
301
+ }
181
302
}
182
303
183
304
static final class GroupedUnicast <K , T > extends GroupedObservable <K , T > {
@@ -247,7 +368,6 @@ public void request(long n) {
247
368
return ;
248
369
}
249
370
BackpressureHelper .add (REQUESTED , this , n );
250
- parent .request (n );
251
371
drain ();
252
372
}
253
373
0 commit comments