Skip to content

Commit ef6a222

Browse files
committed
fix Amb backpressure problems
1 parent 1a85656 commit ef6a222

File tree

2 files changed

+76
-12
lines changed

2 files changed

+76
-12
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.List;
2121
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import rx.Observable;
@@ -371,6 +372,21 @@ public void call() {
371372
}
372373

373374
}));
375+
//need to subscribe to all the sources
376+
for (Observable<? extends T> source : sources) {
377+
if (subscriber.isUnsubscribed()) {
378+
return;
379+
}
380+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
381+
selection.ambSubscribers.add(ambSubscriber);
382+
// check again if choice has been made so can stop subscribing
383+
if (selection.choice.get() != null) {
384+
// Already chose one, the rest can be skipped and we can clean up
385+
selection.unsubscribeOthers(selection.choice.get());
386+
return;
387+
}
388+
source.unsafeSubscribe(ambSubscriber);
389+
}
374390
subscriber.setProducer(new Producer() {
375391

376392
@Override
@@ -379,19 +395,21 @@ public void request(long n) {
379395
// propagate the request to that single Subscriber that won
380396
selection.choice.get().requestMore(n);
381397
} else {
382-
for (Observable<? extends T> source : sources) {
383-
if (subscriber.isUnsubscribed()) {
384-
break;
385-
}
386-
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387-
selection.ambSubscribers.add(ambSubscriber);
388-
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389-
if (selection.choice.get() != null) {
390-
// Already chose one, the rest can be skipped and we can clean up
391-
selection.unsubscribeOthers(selection.choice.get());
392-
break;
398+
//subscriptions already happened so propagate the request to all the
399+
//amb subscribers
400+
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
401+
if (!ambSubscriber.isUnsubscribed()) {
402+
// make a best endeavours check to not waste requests
403+
// if first emission has already occurred
404+
if (selection.choice.get() == ambSubscriber) {
405+
ambSubscriber.requestMore(n);
406+
//don't need to request from other subscribers because choice has been made
407+
//and request has gone to choice
408+
return;
409+
} else {
410+
ambSubscriber.requestMore(n);
411+
}
393412
}
394-
source.unsafeSubscribe(ambSubscriber);
395413
}
396414
}
397415
}

src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static rx.internal.operators.OnSubscribeAmb.amb;
2323

2424
import java.io.IOException;
25+
import java.util.Arrays;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicLong;
2728

@@ -219,4 +220,49 @@ public void testBackpressure() {
219220
ts.assertNoErrors();
220221
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221222
}
223+
224+
225+
@Test
226+
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
227+
final AtomicLong count = new AtomicLong();
228+
Action0 incrementer = new Action0() {
229+
@Override
230+
public void call() {
231+
count.incrementAndGet();
232+
}
233+
};
234+
//this aync stream should emit first
235+
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
236+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
237+
//this stream emits second
238+
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
239+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
240+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
241+
Observable.amb(o1, o2).subscribe(ts);
242+
ts.requestMore(1);
243+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
244+
ts.assertNoErrors();
245+
assertEquals(2, count.get());
246+
}
247+
248+
@Test
249+
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
250+
//this aync stream should emit first
251+
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
252+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
253+
//this stream emits second
254+
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
255+
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
256+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
257+
//before subscription request 1
258+
ts.requestMore(1);
259+
Observable.amb(o1, o2).subscribe(ts);
260+
// before first emission request 20 more
261+
// this request should suffice to emit all
262+
ts.requestMore(20);
263+
//ensure stream does not hang
264+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
265+
ts.assertNoErrors();
266+
}
267+
222268
}

0 commit comments

Comments
 (0)