19
19
import java .util .Collection ;
20
20
import java .util .List ;
21
21
import java .util .concurrent .ConcurrentLinkedQueue ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
22
23
import java .util .concurrent .atomic .AtomicReference ;
23
24
24
25
import rx .Observable ;
@@ -343,30 +344,54 @@ public void unsubscribeOthers(AmbSubscriber<T> notThis) {
343
344
}
344
345
345
346
}
346
-
347
- private final Iterable <? extends Observable <? extends T >> sources ;
348
- private final Selection <T > selection = new Selection <T >();
349
-
347
+
348
+ //give default access instead of private as a micro-optimization
349
+ //for access from anonymous classes below
350
+ final Iterable <? extends Observable <? extends T >> sources ;
351
+ final Selection <T > selection = new Selection <T >();
352
+ final AtomicReference <AmbSubscriber <T >> choice = selection .choice ;
353
+
350
354
private OnSubscribeAmb (Iterable <? extends Observable <? extends T >> sources ) {
351
355
this .sources = sources ;
352
356
}
353
357
354
358
@ Override
355
359
public void call (final Subscriber <? super T > subscriber ) {
360
+
361
+ //need to subscribe to all the sources
362
+ for (Observable <? extends T > source : sources ) {
363
+ if (subscriber .isUnsubscribed ()) {
364
+ return ;
365
+ }
366
+ AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(0 , subscriber , selection );
367
+ selection .ambSubscribers .add (ambSubscriber );
368
+ // check again if choice has been made so can stop subscribing
369
+ // if all sources were backpressure aware then this check
370
+ // would be pointless given that 0 was requested above from each ambSubscriber
371
+ AmbSubscriber <T > c ;
372
+ if ((c = choice .get ()) != null ) {
373
+ // Already chose one, the rest can be skipped and we can clean up
374
+ selection .unsubscribeOthers (c );
375
+ return ;
376
+ }
377
+ source .unsafeSubscribe (ambSubscriber );
378
+ }
356
379
subscriber .add (Subscriptions .create (new Action0 () {
357
380
358
381
@ Override
359
382
public void call () {
360
- if (selection .choice .get () != null ) {
383
+ AmbSubscriber <T > c ;
384
+ if ((c = choice .get ()) != null ) {
361
385
// there is a single winner so we unsubscribe it
362
- selection . choice . get () .unsubscribe ();
386
+ c .unsubscribe ();
363
387
}
364
388
// if we are racing with others still existing, we'll also unsubscribe them
365
- if (!selection .ambSubscribers .isEmpty ()) {
366
- for (AmbSubscriber <T > other : selection .ambSubscribers ) {
389
+ Collection <AmbSubscriber <T >> ambSubs = selection .ambSubscribers ;
390
+ if (!ambSubs .isEmpty ()) {
391
+ for (AmbSubscriber <T > other : ambSubs ) {
367
392
other .unsubscribe ();
368
393
}
369
- selection . ambSubscribers .clear ();
394
+ ambSubs .clear ();
370
395
}
371
396
}
372
397
@@ -375,23 +400,25 @@ public void call() {
375
400
376
401
@ Override
377
402
public void request (long n ) {
378
- if (selection .choice .get () != null ) {
403
+ final AmbSubscriber <T > c ;
404
+ if ((c = choice .get ()) != null ) {
379
405
// propagate the request to that single Subscriber that won
380
- selection . choice . get () .requestMore (n );
406
+ c .requestMore (n );
381
407
} else {
382
- for (Observable <? extends T > source : sources ) {
383
- if (subscriber .isUnsubscribed ()) {
384
- break ;
385
- }
386
- AmbSubscriber <T > ambSubscriber = new AmbSubscriber <T >(n , subscriber , selection );
387
- selection .ambSubscribers .add (ambSubscriber );
388
- // possible race condition in previous lines ... a choice may have been made so double check (instead of synchronizing)
389
- if (selection .choice .get () != null ) {
390
- // Already chose one, the rest can be skipped and we can clean up
391
- selection .unsubscribeOthers (selection .choice .get ());
392
- break ;
408
+ //propagate the request to all the amb subscribers
409
+ for (AmbSubscriber <T > ambSubscriber : selection .ambSubscribers ) {
410
+ if (!ambSubscriber .isUnsubscribed ()) {
411
+ // make a best endeavours check to not waste requests
412
+ // if first emission has already occurred
413
+ if (choice .get () == ambSubscriber ) {
414
+ ambSubscriber .requestMore (n );
415
+ // don't need to request from other subscribers because choice has been made
416
+ // and request has gone to choice
417
+ return ;
418
+ } else {
419
+ ambSubscriber .requestMore (n );
420
+ }
393
421
}
394
- source .unsafeSubscribe (ambSubscriber );
395
422
}
396
423
}
397
424
}
0 commit comments