Skip to content

Commit bb659b6

Browse files
committed
fix Amb backpressure problems
1 parent 1a85656 commit bb659b6

File tree

2 files changed

+104
-23
lines changed

2 files changed

+104
-23
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.List;
2121
import java.util.concurrent.ConcurrentLinkedQueue;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import rx.Observable;
@@ -269,6 +270,7 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {
269270

270271
private final Subscriber<? super T> subscriber;
271272
private final Selection<T> selection;
273+
private boolean chosen;
272274

273275
private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
274276
this.subscriber = subscriber;
@@ -306,12 +308,17 @@ public void onError(Throwable e) {
306308
}
307309

308310
private boolean isSelected() {
311+
if (chosen) {
312+
return true;
313+
}
309314
if (selection.choice.get() == this) {
310315
// fast-path
316+
chosen = true;
311317
return true;
312318
} else {
313319
if (selection.choice.compareAndSet(null, this)) {
314320
selection.unsubscribeOthers(this);
321+
chosen = true;
315322
return true;
316323
} else {
317324
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
@@ -343,30 +350,54 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343350
}
344351

345352
}
346-
347-
private final Iterable<? extends Observable<? extends T>> sources;
348-
private final Selection<T> selection = new Selection<T>();
349-
353+
354+
//give default access instead of private as a micro-optimization
355+
//for access from anonymous classes below
356+
final Iterable<? extends Observable<? extends T>> sources;
357+
final Selection<T> selection = new Selection<T>();
358+
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
359+
350360
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
351361
this.sources = sources;
352362
}
353363

354364
@Override
355365
public void call(final Subscriber<? super T> subscriber) {
366+
367+
//need to subscribe to all the sources
368+
for (Observable<? extends T> source : sources) {
369+
if (subscriber.isUnsubscribed()) {
370+
return;
371+
}
372+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
373+
selection.ambSubscribers.add(ambSubscriber);
374+
// check again if choice has been made so can stop subscribing
375+
// if all sources were backpressure aware then this check
376+
// would be pointless given that 0 was requested above from each ambSubscriber
377+
AmbSubscriber<T> c;
378+
if ((c = choice.get()) != null) {
379+
// Already chose one, the rest can be skipped and we can clean up
380+
selection.unsubscribeOthers(c);
381+
return;
382+
}
383+
source.unsafeSubscribe(ambSubscriber);
384+
}
356385
subscriber.add(Subscriptions.create(new Action0() {
357386

358387
@Override
359388
public void call() {
360-
if (selection.choice.get() != null) {
389+
AmbSubscriber<T> c;
390+
if ((c = choice.get()) != null) {
361391
// there is a single winner so we unsubscribe it
362-
selection.choice.get().unsubscribe();
392+
c.unsubscribe();
363393
}
364394
// if we are racing with others still existing, we'll also unsubscribe them
365-
if(!selection.ambSubscribers.isEmpty()) {
366-
for (AmbSubscriber<T> other : selection.ambSubscribers) {
395+
Collection<AmbSubscriber<T>> ambSubs = selection.ambSubscribers;
396+
if(!ambSubs.isEmpty()) {
397+
for (AmbSubscriber<T> other : ambSubs) {
367398
other.unsubscribe();
368399
}
369-
selection.ambSubscribers.clear();
400+
ambSubs.clear();
370401
}
371402
}
372403

@@ -375,23 +406,25 @@ public void call() {
375406

376407
@Override
377408
public void request(long n) {
378-
if (selection.choice.get() != null) {
409+
final AmbSubscriber<T> c;
410+
if ((c = choice.get()) != null) {
379411
// propagate the request to that single Subscriber that won
380-
selection.choice.get().requestMore(n);
412+
c.requestMore(n);
381413
} else {
382-
for (Observable<? extends T> source : sources) {
383-
if (subscriber.isUnsubscribed()) {
384-
break;
385-
}
386-
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387-
selection.ambSubscribers.add(ambSubscriber);
388-
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389-
if (selection.choice.get() != null) {
390-
// Already chose one, the rest can be skipped and we can clean up
391-
selection.unsubscribeOthers(selection.choice.get());
392-
break;
414+
//propagate the request to all the amb subscribers
415+
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
416+
if (!ambSubscriber.isUnsubscribed()) {
417+
// make a best endeavours check to not waste requests
418+
// if first emission has already occurred
419+
if (choice.get() == ambSubscriber) {
420+
ambSubscriber.requestMore(n);
421+
// don't need to request from other subscribers because choice has been made
422+
// and request has gone to choice
423+
return;
424+
} else {
425+
ambSubscriber.requestMore(n);
426+
}
393427
}
394-
source.unsafeSubscribe(ambSubscriber);
395428
}
396429
}
397430
}

src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static rx.internal.operators.OnSubscribeAmb.amb;
2323

2424
import java.io.IOException;
25+
import java.util.Arrays;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicLong;
2728

@@ -219,4 +220,51 @@ public void testBackpressure() {
219220
ts.assertNoErrors();
220221
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221222
}
223+
224+
225+
@Test
226+
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
227+
final AtomicLong count = new AtomicLong();
228+
Action0 incrementer = new Action0() {
229+
@Override
230+
public void call() {
231+
count.incrementAndGet();
232+
}
233+
};
234+
//this aync stream should emit first
235+
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
236+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
237+
//this stream emits second
238+
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
239+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
240+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
241+
Observable.amb(o1, o2).subscribe(ts);
242+
ts.requestMore(1);
243+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
244+
ts.assertNoErrors();
245+
assertEquals(2, count.get());
246+
}
247+
248+
@Test
249+
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
250+
//this aync stream should emit first
251+
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
252+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
253+
//this stream emits second
254+
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
255+
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
256+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
257+
@Override
258+
public void onStart() {
259+
request(1);
260+
}};
261+
Observable.amb(o1, o2).subscribe(ts);
262+
// before first emission request 20 more
263+
// this request should suffice to emit all
264+
ts.requestMore(20);
265+
//ensure stream does not hang
266+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
267+
ts.assertNoErrors();
268+
}
269+
222270
}

0 commit comments

Comments
 (0)