Skip to content

Commit b2a4bdd

Browse files
committed
fix Amb backpressure problems
1 parent eccc8c4 commit b2a4bdd

File tree

2 files changed

+143
-29
lines changed

2 files changed

+143
-29
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ private static final class AmbSubscriber<T> extends Subscriber<T> {
269269

270270
private final Subscriber<? super T> subscriber;
271271
private final Selection<T> selection;
272+
private boolean chosen;
272273

273274
private AmbSubscriber(long requested, Subscriber<? super T> subscriber, Selection<T> selection) {
274275
this.subscriber = subscriber;
@@ -282,11 +283,11 @@ private final void requestMore(long n) {
282283
}
283284

284285
@Override
285-
public void onNext(T args) {
286+
public void onNext(T t) {
286287
if (!isSelected()) {
287288
return;
288289
}
289-
subscriber.onNext(args);
290+
subscriber.onNext(t);
290291
}
291292

292293
@Override
@@ -306,12 +307,17 @@ public void onError(Throwable e) {
306307
}
307308

308309
private boolean isSelected() {
310+
if (chosen) {
311+
return true;
312+
}
309313
if (selection.choice.get() == this) {
310314
// fast-path
315+
chosen = true;
311316
return true;
312317
} else {
313318
if (selection.choice.compareAndSet(null, this)) {
314319
selection.unsubscribeOthers(this);
320+
chosen = true;
315321
return true;
316322
} else {
317323
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
@@ -343,59 +349,97 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343349
}
344350

345351
}
346-
347-
private final Iterable<? extends Observable<? extends T>> sources;
348-
private final Selection<T> selection = new Selection<T>();
349-
352+
353+
//give default access instead of private as a micro-optimization
354+
//for access from anonymous classes below
355+
final Iterable<? extends Observable<? extends T>> sources;
356+
final Selection<T> selection = new Selection<T>();
357+
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
358+
350359
private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
351360
this.sources = sources;
352361
}
353362

354363
@Override
355364
public void call(final Subscriber<? super T> subscriber) {
365+
366+
//setup unsubscription of all the subscribers to the sources
356367
subscriber.add(Subscriptions.create(new Action0() {
357368

358369
@Override
359370
public void call() {
360-
if (selection.choice.get() != null) {
371+
AmbSubscriber<T> c;
372+
if ((c = choice.get()) != null) {
361373
// there is a single winner so we unsubscribe it
362-
selection.choice.get().unsubscribe();
374+
c.unsubscribe();
363375
}
364376
// if we are racing with others still existing, we'll also unsubscribe them
365-
if(!selection.ambSubscribers.isEmpty()) {
366-
for (AmbSubscriber<T> other : selection.ambSubscribers) {
367-
other.unsubscribe();
368-
}
369-
selection.ambSubscribers.clear();
370-
}
377+
// if subscriptions are occurring as this is happening then this call may not
378+
// unsubscribe everything. We protect ourselves though by doing another unsubscribe check
379+
// after the subscription loop below
380+
unsubscribeAmbSubscribers(selection.ambSubscribers);
371381
}
372-
382+
373383
}));
384+
385+
//need to subscribe to all the sources
386+
for (Observable<? extends T> source : sources) {
387+
if (subscriber.isUnsubscribed()) {
388+
break;
389+
}
390+
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
391+
selection.ambSubscribers.add(ambSubscriber);
392+
// check again if choice has been made so can stop subscribing
393+
// if all sources were backpressure aware then this check
394+
// would be pointless given that 0 was requested above from each ambSubscriber
395+
AmbSubscriber<T> c;
396+
if ((c = choice.get()) != null) {
397+
// Already chose one, the rest can be skipped and we can clean up
398+
selection.unsubscribeOthers(c);
399+
return;
400+
}
401+
source.unsafeSubscribe(ambSubscriber);
402+
}
403+
// while subscribing unsubscription may have occurred so we clean up after
404+
if (subscriber.isUnsubscribed()) {
405+
unsubscribeAmbSubscribers(selection.ambSubscribers);
406+
}
407+
374408
subscriber.setProducer(new Producer() {
375409

376410
@Override
377411
public void request(long n) {
378-
if (selection.choice.get() != null) {
412+
final AmbSubscriber<T> c;
413+
if ((c = choice.get()) != null) {
379414
// propagate the request to that single Subscriber that won
380-
selection.choice.get().requestMore(n);
415+
c.requestMore(n);
381416
} else {
382-
for (Observable<? extends T> source : sources) {
383-
if (subscriber.isUnsubscribed()) {
384-
break;
385-
}
386-
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(n, subscriber, selection);
387-
selection.ambSubscribers.add(ambSubscriber);
388-
// possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389-
if (selection.choice.get() != null) {
390-
// Already chose one, the rest can be skipped and we can clean up
391-
selection.unsubscribeOthers(selection.choice.get());
392-
break;
417+
//propagate the request to all the amb subscribers
418+
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
419+
if (!ambSubscriber.isUnsubscribed()) {
420+
// make a best endeavours check to not waste requests
421+
// if first emission has already occurred
422+
if (choice.get() == ambSubscriber) {
423+
ambSubscriber.requestMore(n);
424+
// don't need to request from other subscribers because choice has been made
425+
// and request has gone to choice
426+
return;
427+
} else {
428+
ambSubscriber.requestMore(n);
429+
}
393430
}
394-
source.unsafeSubscribe(ambSubscriber);
395431
}
396432
}
397433
}
398434
});
399435
}
400436

437+
private static <T> void unsubscribeAmbSubscribers(Collection<AmbSubscriber<T>> ambSubscribers) {
438+
if(!ambSubscribers.isEmpty()) {
439+
for (AmbSubscriber<T> other : ambSubscribers) {
440+
other.unsubscribe();
441+
}
442+
ambSubscribers.clear();
443+
}
444+
}
401445
}

src/test/java/rx/internal/operators/OnSubscribeAmbTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static rx.internal.operators.OnSubscribeAmb.amb;
2323

2424
import java.io.IOException;
25+
import java.util.Arrays;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicLong;
2728

@@ -36,6 +37,7 @@
3637
import rx.Scheduler;
3738
import rx.Subscriber;
3839
import rx.functions.Action0;
40+
import rx.functions.Action1;
3941
import rx.internal.util.RxRingBuffer;
4042
import rx.observers.TestSubscriber;
4143
import rx.schedulers.Schedulers;
@@ -219,4 +221,72 @@ public void testBackpressure() {
219221
ts.assertNoErrors();
220222
assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size());
221223
}
224+
225+
226+
@Test
227+
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
228+
final AtomicLong count = new AtomicLong();
229+
Action0 incrementer = new Action0() {
230+
@Override
231+
public void call() {
232+
count.incrementAndGet();
233+
}
234+
};
235+
//this aync stream should emit first
236+
Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
237+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
238+
//this stream emits second
239+
Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
240+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
241+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
242+
Observable.amb(o1, o2).subscribe(ts);
243+
ts.requestMore(1);
244+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
245+
ts.assertNoErrors();
246+
assertEquals(2, count.get());
247+
}
248+
249+
@Test
250+
public void testSecondaryRequestsPropagatedToChildren() throws InterruptedException {
251+
//this aync stream should emit first
252+
Observable<Integer> o1 = Observable.from(Arrays.asList(1, 2, 3))
253+
.delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
254+
//this stream emits second
255+
Observable<Integer> o2 = Observable.from(Arrays.asList(4, 5, 6))
256+
.delay(200, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
257+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
258+
@Override
259+
public void onStart() {
260+
request(1);
261+
}};
262+
Observable.amb(o1, o2).subscribe(ts);
263+
// before first emission request 20 more
264+
// this request should suffice to emit all
265+
ts.requestMore(20);
266+
//ensure stream does not hang
267+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
268+
ts.assertNoErrors();
269+
}
270+
271+
@Test
272+
public void testSynchronousSources() {
273+
// under async subscription the second observable would complete before
274+
// the first but because this is a synchronous subscription to sources
275+
// then second observable does not get subscribed to before first
276+
// subscription completes hence first observable emits result through
277+
// amb
278+
int result = Observable.just(1).doOnNext(new Action1<Object>() {
279+
280+
@Override
281+
public void call(Object t) {
282+
try {
283+
Thread.sleep(100);
284+
} catch (InterruptedException e) {
285+
//
286+
}
287+
}
288+
}).ambWith(Observable.just(2)).toBlocking().single();
289+
assertEquals(1, result);
290+
}
291+
222292
}

0 commit comments

Comments
 (0)