Skip to content

Commit 0202597

Browse files
committed
fix request processing in OperatorSwitchIfNext
1 parent 4b58a87 commit 0202597

File tree

2 files changed

+69
-27
lines changed

2 files changed

+69
-27
lines changed

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.concurrent.atomic.AtomicLong;
1918

2019
import rx.*;
20+
import rx.internal.producers.ProducerArbiter;
2121
import rx.subscriptions.SerialSubscription;
2222

2323
/**
@@ -38,33 +38,35 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
3838
final SwitchIfEmptySubscriber parent = new SwitchIfEmptySubscriber(child, ssub);
3939
ssub.set(parent);
4040
child.add(ssub);
41+
child.setProducer(new Producer() {
42+
@Override
43+
public void request(long n) {
44+
parent.requestMore(n);
45+
}});
4146
return parent;
4247
}
4348

4449
private class SwitchIfEmptySubscriber extends Subscriber<T> {
4550

4651
boolean empty = true;
47-
final AtomicLong consumerCapacity = new AtomicLong(0l);
4852

49-
private final Subscriber<? super T> child;
53+
final Subscriber<? super T> child;
5054
final SerialSubscription ssub;
55+
final ProducerArbiter arbiter;
5156

5257
public SwitchIfEmptySubscriber(Subscriber<? super T> child, final SerialSubscription ssub) {
5358
this.child = child;
5459
this.ssub = ssub;
60+
this.arbiter = new ProducerArbiter();
61+
}
62+
63+
public void requestMore(long n) {
64+
arbiter.request(n);
5565
}
5666

5767
@Override
5868
public void setProducer(final Producer producer) {
59-
super.setProducer(new Producer() {
60-
@Override
61-
public void request(long n) {
62-
if (empty) {
63-
consumerCapacity.set(n);
64-
}
65-
producer.request(n);
66-
}
67-
});
69+
arbiter.setProducer(producer);
6870
}
6971

7072
@Override
@@ -81,20 +83,7 @@ private void subscribeToAlternate() {
8183

8284
@Override
8385
public void setProducer(final Producer producer) {
84-
child.setProducer(new Producer() {
85-
@Override
86-
public void request(long n) {
87-
producer.request(n);
88-
}
89-
});
90-
}
91-
92-
@Override
93-
public void onStart() {
94-
final long capacity = consumerCapacity.get();
95-
if (capacity > 0) {
96-
request(capacity);
97-
}
86+
arbiter.setProducer(producer);
9887
}
9988

10089
@Override
@@ -110,6 +99,7 @@ public void onError(Throwable e) {
11099
@Override
111100
public void onNext(T t) {
112101
child.onNext(t);
102+
arbiter.produced(1);
113103
}
114104
}));
115105
}

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.junit.Test;
2425

2526
import rx.*;
2627
import rx.Observable;
28+
import rx.Observable.OnSubscribe;
2729
import rx.functions.Action0;
30+
import rx.functions.Action1;
2831
import rx.observers.TestSubscriber;
32+
import rx.schedulers.Schedulers;
2933
import rx.subscriptions.Subscriptions;
3034

3135
public class OperatorSwitchIfEmptyTest {
@@ -142,6 +146,11 @@ public void onStart() {
142146

143147
assertEquals(Arrays.asList(1), ts.getOnNextEvents());
144148
ts.assertNoErrors();
149+
ts.requestMore(1);
150+
ts.assertValueCount(2);
151+
ts.requestMore(1);
152+
ts.assertValueCount(3);
153+
System.out.println(ts.getOnNextEvents());
145154
}
146155
@Test
147156
public void testBackpressureNoRequest() {
@@ -153,8 +162,51 @@ public void onStart() {
153162
}
154163
};
155164
Observable.<Integer>empty().switchIfEmpty(Observable.just(1, 2, 3)).subscribe(ts);
156-
157165
assertTrue(ts.getOnNextEvents().isEmpty());
158166
ts.assertNoErrors();
159167
}
168+
169+
@Test
170+
public void testBackpressureOnFirstObservable() {
171+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
172+
Observable.just(1,2,3).switchIfEmpty(Observable.just(4, 5, 6)).subscribe(ts);
173+
ts.assertNotCompleted();
174+
ts.assertNoErrors();
175+
ts.assertNoValues();
176+
}
177+
178+
@Test(timeout = 10000)
179+
public void testRequestsNotLost() throws InterruptedException {
180+
final TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
181+
Observable.create(new OnSubscribe<Long>() {
182+
183+
@Override
184+
public void call(final Subscriber<? super Long> subscriber) {
185+
subscriber.setProducer(new Producer() {
186+
final AtomicBoolean completed = new AtomicBoolean(false);
187+
@Override
188+
public void request(long n) {
189+
if (n > 0 && completed.compareAndSet(false, true)) {
190+
Schedulers.io().createWorker().schedule(new Action0() {
191+
@Override
192+
public void call() {
193+
subscriber.onCompleted();
194+
}}, 100, TimeUnit.MILLISECONDS);
195+
}
196+
}});
197+
}})
198+
.switchIfEmpty(Observable.from(Arrays.asList(1L, 2L, 3L)))
199+
.subscribeOn(Schedulers.computation())
200+
.subscribe(ts);
201+
ts.requestMore(0);
202+
Thread.sleep(50);
203+
//request while first observable is still finishing (as empty)
204+
ts.requestMore(1);
205+
ts.requestMore(1);
206+
Thread.sleep(500);
207+
ts.assertNotCompleted();
208+
ts.assertNoErrors();
209+
ts.assertValueCount(2);
210+
ts.unsubscribe();
211+
}
160212
}

0 commit comments

Comments
 (0)