Skip to content

Commit 4f7926e

Browse files
committed
fix request processing in OperatorSwitchIfNext
1 parent 4b58a87 commit 4f7926e

File tree

2 files changed

+95
-52
lines changed

2 files changed

+95
-52
lines changed

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 43 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLong;
1918

2019
import rx.*;
20+
import rx.internal.producers.ProducerArbiter;
2121
import rx.subscriptions.SerialSubscription;
2222

2323
/**
@@ -35,36 +35,30 @@ public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
3535
@Override
3636
public Subscriber<? super T> call(Subscriber<? super T> child) {
3737
final SerialSubscription ssub = new SerialSubscription();
38-
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
38+
ProducerArbiter arbiter = new ProducerArbiter();
39+
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub, arbiter);
3940
ssub.set(parent);
4041
child.add(ssub);
42+
child.setProducer(arbiter);
4143
return parent;
4244
}
4345

4446
private class SwitchIfEmptySubscriber extends Subscriber<T> {
4547

46-
boolean empty = true;
47-
final AtomicLong consumerCapacity = new AtomicLong(0l);
48-
48+
private boolean empty = true;
4949
private final Subscriber<? super T> child;
50-
final SerialSubscription ssub;
50+
private final SerialSubscription ssub;
51+
private final ProducerArbiter arbiter;
5152

52-
public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
53+
SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub, ProducerArbiter arbiter) {
5354
this.child = child;
5455
this.ssub = ssub;
56+
this.arbiter = arbiter;
5557
}
5658

5759
@Override
5860
public void setProducer(final Producer producer) {
59-
super.setProducer(new Producer() {
60-
@Override
61-
public void request(long n) {
62-
if (empty) {
63-
consumerCapacity.set(n);
64-
}
65-
producer.request(n);
66-
}
67-
});
61+
arbiter.setProducer(producer);
6862
}
6963

7064
@Override
@@ -77,41 +71,7 @@ public void onCompleted() {
7771
}
7872

7973
private void subscribeToAlternate() {
80-
ssub.set(alternate.unsafeSubscribe(new Subscriber<T>() {
81-
82-
@Override
83-
public void setProducer(final Producer producer) {
84-
child.setProducer(new Producer() {
85-
@Override
86-
public void request(long n) {
87-
producer.request(n);
88-
}
89-
});
90-
}
91-
92-
@Override
93-
public void onStart() {
94-
final long capacity = consumerCapacity.get();
95-
if (capacity > 0) {
96-
request(capacity);
97-
}
98-
}
99-
100-
@Override
101-
public void onCompleted() {
102-
child.onCompleted();
103-
}
104-
105-
@Override
106-
public void onError(Throwable e) {
107-
child.onError(e);
108-
}
109-
110-
@Override
111-
public void onNext(T t) {
112-
child.onNext(t);
113-
}
114-
}));
74+
ssub.set(alternate.unsafeSubscribe(new AlternateSubscriber<T>(child, arbiter)));
11575
}
11676

11777
@Override
@@ -125,4 +85,36 @@ public void onNext(T t) {
12585
child.onNext(t);
12686
}
12787
}
88+
89+
private static final class AlternateSubscriber<T> extends Subscriber<T> {
90+
91+
private final ProducerArbiter arbiter;
92+
private final Subscriber<? super T> child;
93+
94+
AlternateSubscriber(Subscriber<? super T> child, ProducerArbiter arbiter) {
95+
this.child = child;
96+
this.arbiter = arbiter;
97+
}
98+
99+
@Override
100+
public void setProducer(final Producer producer) {
101+
arbiter.setProducer(producer);
102+
}
103+
104+
@Override
105+
public void onCompleted() {
106+
child.onCompleted();
107+
}
108+
109+
@Override
110+
public void onError(Throwable e) {
111+
child.onError(e);
112+
}
113+
114+
@Override
115+
public void onNext(T t) {
116+
child.onNext(t);
117+
arbiter.produced(1);
118+
}
119+
}
128120
}

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.junit.Test;
2425

2526
import rx.*;
2627
import rx.Observable;
28+
import rx.Observable.OnSubscribe;
2729
import rx.functions.Action0;
30+
import rx.functions.Action1;
2831
import rx.observers.TestSubscriber;
32+
import rx.schedulers.Schedulers;
2933
import rx.subscriptions.Subscriptions;
3034

3135
public class OperatorSwitchIfEmptyTest {
@@ -142,6 +146,10 @@ public void onStart() {
142146

143147
assertEquals(Arrays.asList(1), ts.getOnNextEvents());
144148
ts.assertNoErrors();
149+
ts.requestMore(1);
150+
ts.assertValueCount(2);
151+
ts.requestMore(1);
152+
ts.assertValueCount(3);
145153
}
146154
@Test
147155
public void testBackpressureNoRequest() {
@@ -153,8 +161,51 @@ public void onStart() {
153161
}
154162
};
155163
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
156-
157164
assertTrue(ts.getOnNextEvents().isEmpty());
158165
ts.assertNoErrors();
159166
}
167+
168+
@Test
169+
public void testBackpressureOnFirstObservable() {
170+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
171+
Observable.just(1,2,3).switchIfEmpty(Observable.just(4, 5, 6)).subscribe(ts);
172+
ts.assertNotCompleted();
173+
ts.assertNoErrors();
174+
ts.assertNoValues();
175+
}
176+
177+
@Test(timeout = 10000)
178+
public void testRequestsNotLost() throws InterruptedException {
179+
final TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
180+
Observable.create(new OnSubscribe<Long>() {
181+
182+
@Override
183+
public void call(final Subscriber<? super Long> subscriber) {
184+
subscriber.setProducer(new Producer() {
185+
final AtomicBoolean completed = new AtomicBoolean(false);
186+
@Override
187+
public void request(long n) {
188+
if (n > 0 && completed.compareAndSet(false, true)) {
189+
Schedulers.io().createWorker().schedule(new Action0() {
190+
@Override
191+
public void call() {
192+
subscriber.onCompleted();
193+
}}, 100, TimeUnit.MILLISECONDS);
194+
}
195+
}});
196+
}})
197+
.switchIfEmpty(Observable.from(Arrays.asList(1L, 2L, 3L)))
198+
.subscribeOn(Schedulers.computation())
199+
.subscribe(ts);
200+
ts.requestMore(0);
201+
Thread.sleep(50);
202+
//request while first observable is still finishing (as empty)
203+
ts.requestMore(1);
204+
ts.requestMore(1);
205+
Thread.sleep(500);
206+
ts.assertNotCompleted();
207+
ts.assertNoErrors();
208+
ts.assertValueCount(2);
209+
ts.unsubscribe();
210+
}
160211
}

0 commit comments

Comments
 (0)