Skip to content

Commit cbec342

Browse files
davidmotenakarnokd
authored andcommitted
save allocation in OnSubscribeAmb (#4232)
1 parent 2284d4f commit cbec342

File tree

1 file changed

+9
-10
lines changed

1 file changed

+9
-10
lines changed

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,12 @@ private boolean isSelected() {
320320
if (chosen) {
321321
return true;
322322
}
323-
if (selection.choice.get() == this) {
323+
if (selection.get() == this) {
324324
// fast-path
325325
chosen = true;
326326
return true;
327327
} else {
328-
if (selection.choice.compareAndSet(null, this)) {
328+
if (selection.compareAndSet(null, this)) {
329329
selection.unsubscribeOthers(this);
330330
chosen = true;
331331
return true;
@@ -338,12 +338,12 @@ private boolean isSelected() {
338338
}
339339
}
340340

341-
static final class Selection<T> {
342-
final AtomicReference<AmbSubscriber<T>> choice = new AtomicReference<AmbSubscriber<T>>();
341+
@SuppressWarnings("serial")
342+
static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
343343
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
344344

345345
public void unsubscribeLosers() {
346-
AmbSubscriber<T> winner = choice.get();
346+
AmbSubscriber<T> winner = get();
347347
if(winner != null) {
348348
unsubscribeOthers(winner);
349349
}
@@ -367,15 +367,14 @@ private OnSubscribeAmb(Iterable<? extends Observable<? extends T>> sources) {
367367
@Override
368368
public void call(final Subscriber<? super T> subscriber) {
369369
final Selection<T> selection = new Selection<T>();
370-
final AtomicReference<AmbSubscriber<T>> choice = selection.choice;
371370

372371
//setup unsubscription of all the subscribers to the sources
373372
subscriber.add(Subscriptions.create(new Action0() {
374373

375374
@Override
376375
public void call() {
377376
AmbSubscriber<T> c;
378-
if ((c = choice.get()) != null) {
377+
if ((c = selection.get()) != null) {
379378
// there is a single winner so we unsubscribe it
380379
c.unsubscribe();
381380
}
@@ -399,7 +398,7 @@ public void call() {
399398
// if all sources were backpressure aware then this check
400399
// would be pointless given that 0 was requested above from each ambSubscriber
401400
AmbSubscriber<T> c;
402-
if ((c = choice.get()) != null) {
401+
if ((c = selection.get()) != null) {
403402
// Already chose one, the rest can be skipped and we can clean up
404403
selection.unsubscribeOthers(c);
405404
return;
@@ -416,7 +415,7 @@ public void call() {
416415
@Override
417416
public void request(long n) {
418417
AmbSubscriber<T> c;
419-
if ((c = choice.get()) != null) {
418+
if ((c = selection.get()) != null) {
420419
// propagate the request to that single Subscriber that won
421420
c.requestMore(n);
422421
} else {
@@ -425,7 +424,7 @@ public void request(long n) {
425424
if (!ambSubscriber.isUnsubscribed()) {
426425
// make a best endeavours check to not waste requests
427426
// if first emission has already occurred
428-
if (choice.get() == ambSubscriber) {
427+
if (selection.get() == ambSubscriber) {
429428
ambSubscriber.requestMore(n);
430429
// don't need to request from other subscribers because choice has been made
431430
// and request has gone to choice

0 commit comments

Comments
 (0)