@@ -79,6 +79,9 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
79
79
final Func1 <? super T , ? extends R > elementSelector ;
80
80
final Subscriber <? super GroupedObservable <K , R >> child ;
81
81
82
+ // We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true.
83
+ // Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly.
84
+ // Should check both when `child.unsubscribe` is called and any group is removed.
82
85
@ SuppressWarnings ("rawtypes" )
83
86
static final AtomicIntegerFieldUpdater <GroupBySubscriber > WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "wipForUnsubscribe" );
84
87
volatile int wipForUnsubscribe = 1 ;
@@ -124,7 +127,13 @@ public Observer<T> getObserver() {
124
127
private static final NotificationLite <Object > nl = NotificationLite .instance ();
125
128
126
129
volatile int completionEmitted ;
127
- volatile int terminated ;
130
+
131
+ private static final int UNTERMINATED = 0 ;
132
+ private static final int TERMINATED_WITH_COMPLETED = 1 ;
133
+ private static final int TERMINATED_WITH_ERROR = 2 ;
134
+
135
+ // Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR`
136
+ volatile int terminated = UNTERMINATED ;
128
137
129
138
@ SuppressWarnings ("rawtypes" )
130
139
static final AtomicIntegerFieldUpdater <GroupBySubscriber > COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater .newUpdater (GroupBySubscriber .class , "completionEmitted" );
@@ -139,8 +148,6 @@ public Observer<T> getObserver() {
139
148
@ SuppressWarnings ("rawtypes" )
140
149
static final AtomicLongFieldUpdater <GroupBySubscriber > BUFFERED_COUNT = AtomicLongFieldUpdater .newUpdater (GroupBySubscriber .class , "bufferedCount" );
141
150
142
- volatile boolean errorEmitted = false ;
143
-
144
151
@ Override
145
152
public void onStart () {
146
153
REQUESTED .set (this , MAX_QUEUE_SIZE );
@@ -149,7 +156,7 @@ public void onStart() {
149
156
150
157
@ Override
151
158
public void onCompleted () {
152
- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
159
+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_COMPLETED )) {
153
160
// if we receive onCompleted from our parent we onComplete children
154
161
// for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it.
155
162
for (GroupState <K , T > group : groups .values ()) {
@@ -168,9 +175,7 @@ public void onCompleted() {
168
175
169
176
@ Override
170
177
public void onError (Throwable e ) {
171
- if (TERMINATED_UPDATER .compareAndSet (this , 0 , 1 )) {
172
- errorEmitted = true ;
173
-
178
+ if (TERMINATED_UPDATER .compareAndSet (this , UNTERMINATED , TERMINATED_WITH_ERROR )) {
174
179
// It's safe to access all groups and emit the error.
175
180
// onNext and onError are in sequence so no group will be created in the loop.
176
181
for (GroupState <K , T > group : groups .values ()) {
@@ -390,18 +395,16 @@ private void drainIfPossible(GroupState<K, T> groupState) {
390
395
}
391
396
392
397
private void completeInner () {
398
+ // A group is removed, so check if we need to call `unsubscribe`
393
399
if (WIP_FOR_UNSUBSCRIBE_UPDATER .decrementAndGet (this ) == 0 ) {
400
+ // It means `groups.isEmpty() && child.isUnsubscribed()` is true
394
401
unsubscribe ();
395
402
}
396
- // if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
397
- if (groups .isEmpty () && ( terminated == 1 || child . isUnsubscribed ()) ) {
403
+ // if we have no outstanding groups (all completed or unsubscribe) and terminated on outer
404
+ if (groups .isEmpty () && terminated == TERMINATED_WITH_COMPLETED ) {
398
405
// completionEmitted ensures we only emit onCompleted once
399
406
if (COMPLETION_EMITTED_UPDATER .compareAndSet (this , 0 , 1 )) {
400
-
401
- if (child .isUnsubscribed ()) {
402
- // if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
403
- unsubscribe ();
404
- } else if (!errorEmitted ) {
407
+ if (!child .isUnsubscribed ()) {
405
408
child .onCompleted ();
406
409
}
407
410
}
0 commit comments