Skip to content

Commit c4fcd91

Browse files
committed
fix request processing in OperatorSwitchIfNext
1 parent 4b58a87 commit c4fcd91

File tree

2 files changed

+101
-53
lines changed

2 files changed

+101
-53
lines changed

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLong;
1918

2019
import rx.*;
20+
import rx.internal.producers.ProducerArbiter;
2121
import rx.subscriptions.SerialSubscription;
2222

2323
/**
@@ -35,36 +35,32 @@ public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
3535
@Override
3636
public Subscriber<? super T> call(Subscriber<? super T> child) {
3737
final SerialSubscription ssub = new SerialSubscription();
38-
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
38+
ProducerArbiter arbiter = new ProducerArbiter();
39+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, ssub, arbiter, alternate);
3940
ssub.set(parent);
4041
child.add(ssub);
42+
child.setProducer(arbiter);
4143
return parent;
4244
}
4345

44-
private class SwitchIfEmptySubscriber extends Subscriber<T> {
45-
46-
boolean empty = true;
47-
final AtomicLong consumerCapacity = new AtomicLong(0l);
46+
private static final class ParentSubscriber<T> extends Subscriber<T> {
4847

48+
private boolean empty = true;
4949
private final Subscriber<? super T> child;
50-
final SerialSubscription ssub;
50+
private final SerialSubscription ssub;
51+
private final ProducerArbiter arbiter;
52+
private final Observable<? extends T> alternate;
5153

52-
public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
54+
ParentSubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter, Observable<? extends T> alternate) {
5355
this.child = child;
5456
this.ssub = ssub;
57+
this.arbiter = arbiter;
58+
this.alternate = alternate;
5559
}
5660

5761
@Override
5862
public void setProducer(final Producer producer) {
59-
super.setProducer(new Producer() {
60-
@Override
61-
public void request(long n) {
62-
if (empty) {
63-
consumerCapacity.set(n);
64-
}
65-
producer.request(n);
66-
}
67-
});
63+
arbiter.setProducer(producer);
6864
}
6965

7066
@Override
@@ -77,41 +73,9 @@ public void onCompleted() {
7773
}
7874

7975
private void subscribeToAlternate() {
80-
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {
81-
82-
@Override
83-
public void setProducer(final Producer producer) {
84-
child.setProducer(new Producer() {
85-
@Override
86-
public void request(long n) {
87-
producer.request(n);
88-
}
89-
});
90-
}
91-
92-
@Override
93-
public void onStart() {
94-
final long capacity = consumerCapacity.get();
95-
if (capacity > 0) {
96-
request(capacity);
97-
}
98-
}
99-
100-
@Override
101-
public void onCompleted() {
102-
child.onCompleted();
103-
}
104-
105-
@Override
106-
public void onError(Throwable e) {
107-
child.onError(e);
108-
}
109-
110-
@Override
111-
public void onNext(T t) {
112-
child.onNext(t);
113-
}
114-
}));
76+
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
77+
ssub.set(as);
78+
alternate.unsafeSubscribe(as);
11579
}
11680

11781
@Override
@@ -123,6 +87,39 @@ public void onError(Throwable e) {
12387
public void onNext(T t) {
12488
empty = false;
12589
child.onNext(t);
90+
arbiter.produced(1);
91+
}
92+
}
93+
94+
private static final class AlternateSubscriber<T> extends Subscriber<T> {
95+
96+
private final ProducerArbiter arbiter;
97+
private final Subscriber<? super T> child;
98+
99+
AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
100+
this.child = child;
101+
this.arbiter = arbiter;
102+
}
103+
104+
@Override
105+
public void setProducer(final Producer producer) {
106+
arbiter.setProducer(producer);
107+
}
108+
109+
@Override
110+
public void onCompleted() {
111+
child.onCompleted();
126112
}
113+
114+
@Override
115+
public void onError(Throwable e) {
116+
child.onError(e);
117+
}
118+
119+
@Override
120+
public void onNext(T t) {
121+
child.onNext(t);
122+
arbiter.produced(1);
123+
}
127124
}
128125
}

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.junit.Test;
2425

2526
import rx.*;
2627
import rx.Observable;
28+
import rx.Observable.OnSubscribe;
2729
import rx.functions.Action0;
30+
import rx.functions.Action1;
2831
import rx.observers.TestSubscriber;
32+
import rx.schedulers.Schedulers;
2933
import rx.subscriptions.Subscriptions;
3034

3135
public class OperatorSwitchIfEmptyTest {
@@ -142,6 +146,10 @@ public void onStart() {
142146

143147
assertEquals(Arrays.asList(1), ts.getOnNextEvents());
144148
ts.assertNoErrors();
149+
ts.requestMore(1);
150+
ts.assertValueCount(2);
151+
ts.requestMore(1);
152+
ts.assertValueCount(3);
145153
}
146154
@Test
147155
public void testBackpressureNoRequest() {
@@ -153,8 +161,51 @@ public void onStart() {
153161
}
154162
};
155163
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
156-
157164
assertTrue(ts.getOnNextEvents().isEmpty());
158165
ts.assertNoErrors();
159166
}
167+
168+
@Test
169+
public void testBackpressureOnFirstObservable() {
170+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
171+
Observable.just(1,2,3).switchIfEmpty(Observable.just(4, 5, 6)).subscribe(ts);
172+
ts.assertNotCompleted();
173+
ts.assertNoErrors();
174+
ts.assertNoValues();
175+
}
176+
177+
@Test(timeout = 10000)
178+
public void testRequestsNotLost() throws InterruptedException {
179+
final TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
180+
Observable.create(new OnSubscribe<Long>() {
181+
182+
@Override
183+
public void call(final Subscriber<? super Long> subscriber) {
184+
subscriber.setProducer(new Producer() {
185+
final AtomicBoolean completed = new AtomicBoolean(false);
186+
@Override
187+
public void request(long n) {
188+
if (n > 0 && completed.compareAndSet(false, true)) {
189+
Schedulers.io().createWorker().schedule(new Action0() {
190+
@Override
191+
public void call() {
192+
subscriber.onCompleted();
193+
}}, 100, TimeUnit.MILLISECONDS);
194+
}
195+
}});
196+
}})
197+
.switchIfEmpty(Observable.from(Arrays.asList(1L, 2L, 3L)))
198+
.subscribeOn(Schedulers.computation())
199+
.subscribe(ts);
200+
ts.requestMore(0);
201+
Thread.sleep(50);
202+
//request while first observable is still finishing (as empty)
203+
ts.requestMore(1);
204+
ts.requestMore(1);
205+
Thread.sleep(500);
206+
ts.assertNotCompleted();
207+
ts.assertNoErrors();
208+
ts.assertValueCount(2);
209+
ts.unsubscribe();
210+
}
160211
}

0 commit comments

Comments
 (0)