19
19
import java .util .Collection ;
20
20
import java .util .List ;
21
21
import java .util .concurrent .ConcurrentLinkedQueue ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
23
import java .util .concurrent .atomic .AtomicReference ;
23
24
24
25
import rx .Observable ;
@@ -343,10 +344,13 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343
344
}
344
345
345
346
}
346
-
347
- private final Iterable <? extends Observable <? extends T >> sources ;
348
- private final Selection <T > selection = new Selection <T >();
349
-
347
+
348
+ //give default access instead of private as a micro-optimization
349
+ //for access from anonymous classes below
350
+ final Iterable <? extends Observable <? extends T >> sources ;
351
+ final Selection <T > selection = new Selection <T >();
352
+ final AtomicReference <AmbSubscriber <T >> choice = selection .choice ;
353
+
350
354
private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
351
355
this .sources = sources ;
352
356
}
@@ -357,41 +361,63 @@ public void call(final Subscriber<? super T> subscriber) {
357
361
358
362
@ Override
359
363
public void call () {
360
- if (selection .choice .get () != null ) {
364
+ AmbSubscriber <T > c ;
365
+ if ((c = choice .get ()) != null ) {
361
366
// there is a single winner so we unsubscribe it
362
- selection . choice . get () .unsubscribe ();
367
+ c .unsubscribe ();
363
368
}
364
369
// if we are racing with others still existing, we'll also unsubscribe them
365
- if (!selection .ambSubscribers .isEmpty ()) {
366
- for (AmbSubscriber <T > other : selection .ambSubscribers ) {
370
+ Collection <AmbSubscriber <T >> ambSubs = selection .ambSubscribers ;
371
+ if (!ambSubs .isEmpty ()) {
372
+ for (AmbSubscriber <T > other : ambSubs ) {
367
373
other .unsubscribe ();
368
374
}
369
- selection . ambSubscribers .clear ();
375
+ ambSubs .clear ();
370
376
}
371
377
}
372
378
373
379
}));
380
+ //need to subscribe to all the sources
381
+ for (Observable <? extends T > source : sources ) {
382
+ if (subscriber .isUnsubscribed ()) {
383
+ return ;
384
+ }
385
+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(0 , subscriber , selection );
386
+ selection .ambSubscribers .add (ambSubscriber );
387
+ // check again if choice has been made so can stop subscribing
388
+ // if all sources were backpressure aware then this check
389
+ // would be pointless given that 0 was requested above from each ambSubscriber
390
+ AmbSubscriber <T > c ;
391
+ if ((c = choice .get ()) != null ) {
392
+ // Already chose one, the rest can be skipped and we can clean up
393
+ selection .unsubscribeOthers (c );
394
+ return ;
395
+ }
396
+ source .unsafeSubscribe (ambSubscriber );
397
+ }
374
398
subscriber .setProducer (new Producer () {
375
399
376
400
@ Override
377
401
public void request (long n ) {
378
- if (selection .choice .get () != null ) {
402
+ final AmbSubscriber <T > c ;
403
+ if ((c = choice .get ()) != null ) {
379
404
// propagate the request to that single Subscriber that won
380
- selection . choice . get () .requestMore (n );
405
+ c .requestMore (n );
381
406
} else {
382
- for (Observable <? extends T > source : sources ) {
383
- if (subscriber .isUnsubscribed ()) {
384
- break ;
385
- }
386
- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387
- selection .ambSubscribers .add (ambSubscriber );
388
- // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389
- if (selection .choice .get () != null ) {
390
- // Already chose one, the rest can be skipped and we can clean up
391
- selection .unsubscribeOthers (selection .choice .get ());
392
- break ;
407
+ //propagate the request to all the amb subscribers
408
+ for (AmbSubscriber <T > ambSubscriber : selection .ambSubscribers ) {
409
+ if (!ambSubscriber .isUnsubscribed ()) {
410
+ // make a best endeavours check to not waste requests
411
+ // if first emission has already occurred
412
+ if (choice .get () == ambSubscriber ) {
413
+ ambSubscriber .requestMore (n );
414
+ // don't need to request from other subscribers because choice has been made
415
+ // and request has gone to choice
416
+ return ;
417
+ } else {
418
+ ambSubscriber .requestMore (n );
419
+ }
393
420
}
394
- source .unsafeSubscribe (ambSubscriber );
395
421
}
396
422
}
397
423
}
0 commit comments