Skip to content

Commit 1d89bc5

Browse files
committed
collect - prevent multiple terminal emissions, add done to DeferredScalarSubscriber
1 parent b72beff commit 1d89bc5

File tree

10 files changed

+693
-201
lines changed

10 files changed

+693
-201
lines changed

src/main/java/rx/internal/operators/DeferredScalarSubscriber.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import rx.*;
22+
import rx.plugins.RxJavaHooks;
2223

2324
/**
2425
* Base class for Subscribers that consume the entire upstream and signal
@@ -56,19 +57,31 @@ public abstract class DeferredScalarSubscriber<T, R> extends Subscriber<T> {
5657
/** Value will be emitted. */
5758
static final int HAS_REQUEST_HAS_VALUE = 3;
5859

60+
/** prevents multiple terminal emissions. */
61+
protected boolean done;
62+
5963
public DeferredScalarSubscriber(Subscriber<? super R> actual) {
6064
this.actual = actual;
6165
this.state = new AtomicInteger();
6266
}
6367

6468
@Override
6569
public void onError(Throwable ex) {
66-
value = null;
67-
actual.onError(ex);
70+
if (!done) {
71+
done = true;
72+
value = null;
73+
actual.onError(ex);
74+
} else {
75+
RxJavaHooks.onError(ex);
76+
}
6877
}
6978

7079
@Override
7180
public void onCompleted() {
81+
if (done) {
82+
return;
83+
}
84+
done = true;
7285
if (hasValue) {
7386
complete(value);
7487
} else {

src/main/java/rx/internal/operators/OnSubscribeCollect.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,15 @@ public CollectSubscriber(Subscriber<? super R> actual, R initialValue, Action2<R
6363

6464
@Override
6565
public void onNext(T t) {
66+
if (done) {
67+
return;
68+
}
6669
try {
6770
collector.call(value, t);
6871
} catch (Throwable ex) {
6972
Exceptions.throwIfFatal(ex);
7073
unsubscribe();
71-
actual.onError(ex);
74+
onError(ex);
7275
}
7376
}
7477

src/main/java/rx/internal/operators/OnSubscribeReduce.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable.OnSubscribe;
2323
import rx.exceptions.Exceptions;
2424
import rx.functions.Func2;
25+
import rx.plugins.RxJavaHooks;
2526

2627
public final class OnSubscribeReduce<T> implements OnSubscribe<T> {
2728

@@ -57,6 +58,8 @@ static final class ReduceSubscriber<T> extends Subscriber<T> {
5758

5859
static final Object EMPTY = new Object();
5960

61+
boolean done;
62+
6063
@SuppressWarnings("unchecked")
6164
public ReduceSubscriber(Subscriber<? super T> actual, Func2<T, T, T> reducer) {
6265
this.actual = actual;
@@ -68,6 +71,9 @@ public ReduceSubscriber(Subscriber<? super T> actual, Func2<T, T, T> reducer) {
6871
@SuppressWarnings("unchecked")
6972
@Override
7073
public void onNext(T t) {
74+
if (done) {
75+
return;
76+
}
7177
Object o = value;
7278
if (o == EMPTY) {
7379
value = t;
@@ -77,19 +83,28 @@ public void onNext(T t) {
7783
} catch (Throwable ex) {
7884
Exceptions.throwIfFatal(ex);
7985
unsubscribe();
80-
actual.onError(ex);
86+
onError(ex);
8187
}
8288
}
8389
}
8490

8591
@Override
8692
public void onError(Throwable e) {
87-
actual.onError(e);
93+
if (!done) {
94+
done = true;
95+
actual.onError(e);
96+
} else {
97+
RxJavaHooks.onError(e);
98+
}
8899
}
89100

90101
@SuppressWarnings("unchecked")
91102
@Override
92103
public void onCompleted() {
104+
if (done) {
105+
return;
106+
}
107+
done = true;
93108
Object o = value;
94109
if (o != EMPTY) {
95110
actual.onNext((T)o);

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.exceptions.Exceptions;
2222
import rx.functions.Func1;
2323
import rx.internal.producers.SingleDelayedProducer;
24+
import rx.plugins.RxJavaHooks;
2425

2526
/**
2627
* Returns an {@link Observable} that emits <code>true</code> if any element of
@@ -45,6 +46,9 @@ public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
4546

4647
@Override
4748
public void onNext(T t) {
49+
if (done) {
50+
return;
51+
}
4852
hasElements = true;
4953
boolean result;
5054
try {
@@ -53,7 +57,7 @@ public void onNext(T t) {
5357
Exceptions.throwOrReport(e, this, t);
5458
return;
5559
}
56-
if (result && !done) {
60+
if (result) {
5761
done = true;
5862
producer.setValue(!returnOnEmpty);
5963
unsubscribe();
@@ -64,7 +68,12 @@ public void onNext(T t) {
6468

6569
@Override
6670
public void onError(Throwable e) {
67-
child.onError(e);
71+
if (!done) {
72+
done = true;
73+
child.onError(e);
74+
} else {
75+
RxJavaHooks.onError(e);
76+
}
6877
}
6978

7079
@Override

src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -939,62 +939,7 @@ public void testRangeWithScheduler() {
939939
inOrder.verifyNoMoreInteractions();
940940
}
941941

942-
@Test
943-
public void testCollectToList() {
944-
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
945-
946-
@Override
947-
public List<Integer> call() {
948-
return new ArrayList<Integer>();
949-
}
950-
951-
}, new Action2<List<Integer>, Integer>() {
952-
953-
@Override
954-
public void call(List<Integer> list, Integer v) {
955-
list.add(v);
956-
}
957-
});
958942

959-
List<Integer> list = o.toBlocking().last();
960-
961-
assertEquals(3, list.size());
962-
assertEquals(1, list.get(0).intValue());
963-
assertEquals(2, list.get(1).intValue());
964-
assertEquals(3, list.get(2).intValue());
965-
966-
// test multiple subscribe
967-
List<Integer> list2 = o.toBlocking().last();
968-
969-
assertEquals(3, list2.size());
970-
assertEquals(1, list2.get(0).intValue());
971-
assertEquals(2, list2.get(1).intValue());
972-
assertEquals(3, list2.get(2).intValue());
973-
}
974-
975-
@Test
976-
public void testCollectToString() {
977-
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
978-
979-
@Override
980-
public StringBuilder call() {
981-
return new StringBuilder();
982-
}
983-
984-
}, new Action2<StringBuilder, Integer>() {
985-
986-
@Override
987-
public void call(StringBuilder sb, Integer v) {
988-
if (sb.length() > 0) {
989-
sb.append("-");
990-
}
991-
sb.append(v);
992-
}
993-
}).toBlocking().last().toString();
994-
995-
assertEquals("1-2-3", value);
996-
}
997-
998943
@Test
999944
public void testMergeWith() {
1000945
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

0 commit comments

Comments
 (0)