19
19
import java .util .Collection ;
20
20
import java .util .List ;
21
21
import java .util .concurrent .ConcurrentLinkedQueue ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
23
import java .util .concurrent .atomic .AtomicReference ;
23
24
24
25
import rx .Observable ;
@@ -343,10 +344,14 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343
344
}
344
345
345
346
}
346
-
347
- private final Iterable <? extends Observable <? extends T >> sources ;
348
- private final Selection <T > selection = new Selection <T >();
349
-
347
+
348
+ //give default access instead of private as a micro-optimization
349
+ //for access from anonymous classes below
350
+ final Iterable <? extends Observable <? extends T >> sources ;
351
+ final Selection <T > selection = new Selection <T >();
352
+ final AtomicReference <AmbSubscriber <T >> choice = selection .choice ;
353
+ final Collection <AmbSubscriber <T >> ambSubscribers = selection .ambSubscribers ;
354
+
350
355
private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
351
356
this .sources = sources ;
352
357
}
@@ -357,41 +362,62 @@ public void call(final Subscriber<? super T> subscriber) {
357
362
358
363
@ Override
359
364
public void call () {
360
- if (selection .choice .get () != null ) {
365
+ AmbSubscriber <T > c ;
366
+ if ((c = choice .get ()) != null ) {
361
367
// there is a single winner so we unsubscribe it
362
- selection . choice . get () .unsubscribe ();
368
+ c .unsubscribe ();
363
369
}
364
370
// if we are racing with others still existing, we'll also unsubscribe them
365
- if (!selection . ambSubscribers .isEmpty ()) {
366
- for (AmbSubscriber <T > other : selection . ambSubscribers ) {
371
+ if (!ambSubscribers .isEmpty ()) {
372
+ for (AmbSubscriber <T > other : ambSubscribers ) {
367
373
other .unsubscribe ();
368
374
}
369
- selection . ambSubscribers .clear ();
375
+ ambSubscribers .clear ();
370
376
}
371
377
}
372
378
373
379
}));
380
+ //need to subscribe to all the sources
381
+ for (Observable <? extends T > source : sources ) {
382
+ if (subscriber .isUnsubscribed ()) {
383
+ return ;
384
+ }
385
+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(0 , subscriber , selection );
386
+ ambSubscribers .add (ambSubscriber );
387
+ // check again if choice has been made so can stop subscribing
388
+ // if all sources were backpressure aware then this check
389
+ // would be pointless given that 0 was requested above from each ambSubscriber
390
+ AmbSubscriber <T > c ;
391
+ if ((c = choice .get ()) != null ) {
392
+ // Already chose one, the rest can be skipped and we can clean up
393
+ selection .unsubscribeOthers (c );
394
+ return ;
395
+ }
396
+ source .unsafeSubscribe (ambSubscriber );
397
+ }
374
398
subscriber .setProducer (new Producer () {
375
399
376
400
@ Override
377
401
public void request (long n ) {
378
- if (selection .choice .get () != null ) {
402
+ final AmbSubscriber <T > c ;
403
+ if ((c = choice .get ()) != null ) {
379
404
// propagate the request to that single Subscriber that won
380
- selection . choice . get () .requestMore (n );
405
+ c .requestMore (n );
381
406
} else {
382
- for (Observable <? extends T > source : sources ) {
383
- if (subscriber .isUnsubscribed ()) {
384
- break ;
385
- }
386
- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387
- selection .ambSubscribers .add (ambSubscriber );
388
- // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389
- if (selection .choice .get () != null ) {
390
- // Already chose one, the rest can be skipped and we can clean up
391
- selection .unsubscribeOthers (selection .choice .get ());
392
- break ;
407
+ //propagate the request to all the amb subscribers
408
+ for (AmbSubscriber <T > ambSubscriber : ambSubscribers ) {
409
+ if (!ambSubscriber .isUnsubscribed ()) {
410
+ // make a best endeavours check to not waste requests
411
+ // if first emission has already occurred
412
+ if (choice .get () == ambSubscriber ) {
413
+ ambSubscriber .requestMore (n );
414
+ // don't need to request from other subscribers because choice has been made
415
+ // and request has gone to choice
416
+ return ;
417
+ } else {
418
+ ambSubscriber .requestMore (n );
419
+ }
393
420
}
394
- source .unsafeSubscribe (ambSubscriber );
395
421
}
396
422
}
397
423
}
0 commit comments