Skip to content

Commit 272a752

Browse files
committed
Propagate onError to all groups
1 parent 8026b41 commit 272a752

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.Map;
1819
import java.util.Queue;
1920
import java.util.concurrent.ConcurrentHashMap;
2021
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -138,6 +139,8 @@ public Observer<T> getObserver() {
138139
@SuppressWarnings("rawtypes")
139140
static final AtomicLongFieldUpdater<GroupBySubscriber> BUFFERED_COUNT = AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "bufferedCount");
140141

142+
volatile boolean errorEmitted = false;
143+
141144
@Override
142145
public void onStart() {
143146
REQUESTED.set(this, MAX_QUEUE_SIZE);
@@ -166,6 +169,13 @@ public void onCompleted() {
166169
@Override
167170
public void onError(Throwable e) {
168171
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
172+
errorEmitted = true;
173+
174+
// It's safe to access all groups and emit the error.
175+
// onNext and onError are in sequence so no group will be created in the loop.
176+
for (GroupState<K, T> group : groups.values()) {
177+
emitItem(group, nl.error(e));
178+
}
169179
try {
170180
// we immediately tear everything down if we receive an error
171181
child.onError(e);
@@ -259,6 +269,11 @@ public void onCompleted() {
259269
@Override
260270
public void onError(Throwable e) {
261271
o.onError(e);
272+
// eagerly cleanup instead of waiting for unsubscribe
273+
if (once.compareAndSet(false, true)) {
274+
// done once per instance, either onComplete or onUnSubscribe
275+
cleanupGroup(key);
276+
}
262277
}
263278

264279
@Override
@@ -386,7 +401,7 @@ private void completeInner() {
386401
if (child.isUnsubscribed()) {
387402
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
388403
unsubscribe();
389-
} else {
404+
} else if (!errorEmitted) {
390405
child.onCompleted();
391406
}
392407
}

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,4 +1407,51 @@ public Integer call(Integer integer) {
14071407
}).subscribe().unsubscribe();
14081408
verify(s).unsubscribe();
14091409
}
1410+
1411+
@Test
1412+
public void testGroupByShouldPropagateError() {
1413+
final Throwable e = new RuntimeException("Oops");
1414+
final TestSubscriber<Integer> inner1 = new TestSubscriber<Integer>();
1415+
final TestSubscriber<Integer> inner2 = new TestSubscriber<Integer>();
1416+
1417+
final TestSubscriber<GroupedObservable<Integer, Integer>> outer
1418+
= new TestSubscriber<GroupedObservable<Integer, Integer>>(new Subscriber<GroupedObservable<Integer, Integer>>() {
1419+
1420+
@Override
1421+
public void onCompleted() {
1422+
}
1423+
1424+
@Override
1425+
public void onError(Throwable e) {
1426+
}
1427+
1428+
@Override
1429+
public void onNext(GroupedObservable<Integer, Integer> o) {
1430+
if (o.getKey() == 0) {
1431+
o.subscribe(inner1);
1432+
} else {
1433+
o.subscribe(inner2);
1434+
}
1435+
}
1436+
});
1437+
Observable.create(
1438+
new OnSubscribe<Integer>() {
1439+
@Override
1440+
public void call(Subscriber<? super Integer> subscriber) {
1441+
subscriber.onNext(0);
1442+
subscriber.onNext(1);
1443+
subscriber.onError(e);
1444+
}
1445+
}
1446+
).groupBy(new Func1<Integer, Integer>() {
1447+
1448+
@Override
1449+
public Integer call(Integer i) {
1450+
return i % 2;
1451+
}
1452+
}).subscribe(outer);
1453+
assertEquals(Arrays.asList(e), outer.getOnErrorEvents());
1454+
assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
1455+
assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
1456+
}
14101457
}

0 commit comments

Comments
 (0)