Skip to content

Commit aa50d51

Browse files
committed
fix Amb backpressure problems
1 parent 1a85656 commit aa50d51

File tree

2 files changed

+73
-4
lines changed

2 files changed

+73
-4
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.List;
2121
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import rx.Observable;
@@ -373,26 +374,46 @@ public void call() {
373374
}));
374375
subscriber.setProducer(new Producer() {
375376

377+
private final AtomicBoolean subscribedToSources = new AtomicBoolean(false);
378+
376379
@Override
377380
public void request(long n) {
378381
if (selection.choice.get() != null) {
379382
// propagate the request to that single Subscriber that won
380383
selection.choice.get().requestMore(n);
381-
} else {
384+
} else if (subscribedToSources.compareAndSet(false, true)){
385+
//need to subscribe to all the sources
382386
for (Observable<? extends T> source : sources) {
383387
if (subscriber.isUnsubscribed()) {
384-
break;
388+
return;
385389
}
386390
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387391
selection.ambSubscribers.add(ambSubscriber);
388-
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
392+
// check again if choice has been made so can stop subscribing
389393
if (selection.choice.get() != null) {
390394
// Already chose one, the rest can be skipped and we can clean up
391395
selection.unsubscribeOthers(selection.choice.get());
392-
break;
396+
return;
393397
}
394398
source.unsafeSubscribe(ambSubscriber);
395399
}
400+
} else {
401+
//subscriptions already happened so propagate the request to all the
402+
//amb subscribers
403+
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
404+
if (!ambSubscriber.isUnsubscribed()) {
405+
// make a best endeavours check to not waste requests
406+
// if first emission has already occurred
407+
if (selection.choice.get() == ambSubscriber) {
408+
ambSubscriber.requestMore(n);
409+
//don't need to request from other subscribers because choice has been made
410+
//and request has gone to choice
411+
return;
412+
} else {
413+
ambSubscriber.requestMore(n);
414+
}
415+
}
416+
}
396417
}
397418
}
398419
});

src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.mockito.Mockito.inOrder;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.times;
2223
import static rx.internal.operators.OnSubscribeAmb.amb;
2324

2425
import java.io.IOException;
26+
import java.util.Arrays;
27+
import java.util.concurrent.CountDownLatch;
2528
import java.util.concurrent.TimeUnit;
2629
import java.util.concurrent.atomic.AtomicLong;
2730

@@ -219,4 +222,49 @@ public void testBackpressure() {
219222
ts.assertNoErrors();
220223
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221224
}
225+
226+
227+
@Test
228+
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
229+
final AtomicLong count = new AtomicLong();
230+
Action0 incrementer = new Action0() {
231+
@Override
232+
public void call() {
233+
count.incrementAndGet();
234+
}
235+
};
236+
//this aync stream should emit first
237+
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
238+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
239+
//this stream emits second
240+
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
241+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
242+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
243+
Observable.amb(o1, o2).subscribe(ts);
244+
ts.requestMore(1);
245+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
246+
ts.assertNoErrors();
247+
assertEquals(2, count.get());
248+
}
249+
250+
@Test
251+
public void testRequestsPropagatedToChildren() throws InterruptedException {
252+
//this aync stream should emit first
253+
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
254+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
255+
//this stream emits second
256+
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
257+
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
258+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
259+
//before subscription request 1
260+
ts.requestMore(1);
261+
Observable.amb(o1, o2).subscribe(ts);
262+
// before first emission request 20 more
263+
// this request should suffice to emit all
264+
ts.requestMore(20);
265+
//ensure stream does not hang
266+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
267+
ts.assertNoErrors();
268+
}
269+
222270
}

0 commit comments

Comments
 (0)