Skip to content

Commit c1d4ca7

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
SwitchOnNext: fixed wrong producer
1 parent bc1ed77 commit c1d4ca7

File tree

3 files changed

+76
-24
lines changed

3 files changed

+76
-24
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ private void handleNewSource(Observable<? extends T> t) {
194194
}
195195
MergeProducer<T> producerIfNeeded = null;
196196
// if we have received a request then we need to respect it, otherwise we fast-path
197-
if (mergeProducer.requested != Long.MAX_VALUE) {
197+
if (mergeProducer.requested() != Long.MAX_VALUE) {
198198
/**
199199
* <pre> {@code
200200
* With this optimization:
@@ -237,7 +237,7 @@ private void handleScalarSynchronousObservable(ScalarSynchronousObservable<? ext
237237
* } </pre>
238238
*
239239
*/
240-
if (mergeProducer.requested == Long.MAX_VALUE) {
240+
if (mergeProducer.requested() == Long.MAX_VALUE) {
241241
handleScalarSynchronousObservableWithoutRequestLimits(t);
242242
} else {
243243
handleScalarSynchronousObservableWithRequestLimits(t);
@@ -274,11 +274,11 @@ private void handleScalarSynchronousObservableWithRequestLimits(ScalarSynchronou
274274
boolean moreToDrain;
275275
boolean isReturn = false;
276276
try {
277-
long r = mergeProducer.requested;
277+
long r = mergeProducer.requested();
278278
if (r > 0) {
279279
emitted = true;
280280
actual.onNext(t.get());
281-
MergeProducer.REQUESTED.decrementAndGet(mergeProducer);
281+
mergeProducer.getAndAdd(-1);
282282
// we handle this Observable without ever incrementing the wip or touching other machinery so just return here
283283
isReturn = true;
284284
}
@@ -376,7 +376,7 @@ private void drainChildrenQueues() {
376376
private int drainScalarValueQueue() {
377377
RxRingBuffer svq = scalarValueQueue;
378378
if (svq != null) {
379-
long r = mergeProducer.requested;
379+
long r = mergeProducer.requested();
380380
int emittedWhileDraining = 0;
381381
if (r < 0) {
382382
// drain it all
@@ -398,7 +398,7 @@ private int drainScalarValueQueue() {
398398
}
399399
}
400400
// decrement the number we emitted from outstanding requests
401-
MergeProducer.REQUESTED.getAndAdd(mergeProducer, -emittedWhileDraining);
401+
mergeProducer.getAndAdd(-emittedWhileDraining);
402402
}
403403
return emittedWhileDraining;
404404
}
@@ -410,7 +410,7 @@ private int drainScalarValueQueue() {
410410
@Override
411411
public Boolean call(InnerSubscriber<T> s) {
412412
if (s.q != null) {
413-
long r = mergeProducer.requested;
413+
long r = mergeProducer.requested();
414414
int emitted = s.drainQueue();
415415
if (emitted > 0) {
416416
s.requestMore(emitted);
@@ -533,19 +533,26 @@ public MergeProducer(MergeSubscriber<T> ms) {
533533
this.ms = ms;
534534
}
535535

536-
private volatile long requested = 0;
536+
private volatile long rq = 0;
537537
@SuppressWarnings("rawtypes")
538-
static final AtomicLongFieldUpdater<MergeProducer> REQUESTED = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "requested");
538+
static final AtomicLongFieldUpdater<MergeProducer> RQ = AtomicLongFieldUpdater.newUpdater(MergeProducer.class, "rq");
539539

540+
public long requested() {
541+
return rq;
542+
}
543+
public long getAndAdd(long n) {
544+
return RQ.getAndAdd(this, n);
545+
}
546+
540547
@Override
541548
public void request(long n) {
542-
if (requested == Long.MAX_VALUE) {
549+
if (rq == Long.MAX_VALUE) {
543550
return;
544551
}
545552
if (n == Long.MAX_VALUE) {
546-
requested = Long.MAX_VALUE;
553+
rq = Long.MAX_VALUE;
547554
} else {
548-
BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
555+
BackpressureUtils.getAndAddRequest(RQ, this, n);
549556
if (ms.drainQueuesIfNeeded()) {
550557
boolean sendComplete = false;
551558
synchronized (ms) {
@@ -668,7 +675,7 @@ private void emit(T t, boolean complete) {
668675
} else {
669676
// this needs to check q.count() as draining above may not have drained the full queue
670677
// perf tests show this to be okay, though different queue implementations could perform poorly with this
671-
if (producer.requested > 0 && q.count() == 0) {
678+
if (producer.requested() > 0 && q.count() == 0) {
672679
if (complete) {
673680
parentSubscriber.completeInner(this);
674681
} else {
@@ -679,7 +686,7 @@ private void emit(T t, boolean complete) {
679686
onError(OnErrorThrowable.addValueAsLastCause(e, t));
680687
}
681688
emitted++;
682-
MergeProducer.REQUESTED.decrementAndGet(producer);
689+
producer.getAndAdd(-1);
683690
}
684691
} else {
685692
// no requests available, so enqueue it
@@ -728,7 +735,7 @@ private void enqueue(T t, boolean complete) {
728735
private int drainRequested() {
729736
int emitted = 0;
730737
// drain what was requested
731-
long toEmit = producer.requested;
738+
long toEmit = producer.requested();
732739
Object o;
733740
for (int i = 0; i < toEmit; i++) {
734741
o = q.poll();
@@ -750,7 +757,7 @@ private int drainRequested() {
750757
}
751758

752759
// decrement the number we emitted from outstanding requests
753-
MergeProducer.REQUESTED.getAndAdd(producer, -emitted);
760+
producer.getAndAdd(-emitted);
754761
return emitted;
755762
}
756763

src/main/java/rx/internal/operators/OperatorSwitch.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ public static <T> OperatorSwitch<T> instance() {
4949
private OperatorSwitch() { }
5050
@Override
5151
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
52-
return new SwitchSubscriber<T>(child);
52+
SwitchSubscriber<T> sws = new SwitchSubscriber<T>(child);
53+
child.add(sws);
54+
return sws;
5355
}
5456

5557
private static final class SwitchSubscriber<T> extends Subscriber<Observable<? extends T>> {
@@ -75,7 +77,6 @@ private static final class SwitchSubscriber<T> extends Subscriber<Observable<? e
7577
volatile boolean infinite = false;
7678

7779
public SwitchSubscriber(Subscriber<? super T> child) {
78-
super(child);
7980
s = new SerializedSubscriber<T>(child);
8081
ssub = new SerialSubscription();
8182
child.add(ssub);

src/test/java/rx/internal/operators/OperatorSwitchTest.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,25 @@
1818
import static org.junit.Assert.assertTrue;
1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Matchers.anyString;
21-
import static org.mockito.Mockito.inOrder;
22-
import static org.mockito.Mockito.mock;
23-
import static org.mockito.Mockito.never;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.*;
2622

2723
import java.util.Arrays;
2824
import java.util.concurrent.TimeUnit;
2925
import java.util.concurrent.atomic.AtomicBoolean;
3026

27+
import org.junit.Assert;
3128
import org.junit.Before;
3229
import org.junit.Test;
3330
import org.mockito.InOrder;
3431

35-
import rx.*;
32+
import rx.Observable;
33+
import rx.Observer;
34+
import rx.Producer;
35+
import rx.Scheduler;
36+
import rx.Subscriber;
3637
import rx.exceptions.TestException;
3738
import rx.functions.Action0;
39+
import rx.functions.Func1;
3840
import rx.observers.TestSubscriber;
3941
import rx.schedulers.TestScheduler;
4042

@@ -530,4 +532,46 @@ public void call(final Subscriber<? super Observable<Integer>> subscriber) {
530532
).take(1).subscribe();
531533
assertTrue("Switch doesn't propagate 'unsubscribe'", isUnsubscribed.get());
532534
}
535+
/** The upstream producer hijacked the switch producer stopping the requests aimed at the inner observables. */
536+
@Test
537+
public void testIssue2654() {
538+
Observable<String> oneItem = Observable.just("Hello").mergeWith(Observable.<String>never());
539+
540+
Observable<String> src = oneItem.switchMap(new Func1<String, Observable<String>>() {
541+
@Override
542+
public Observable<String> call(final String s) {
543+
return Observable.just(s)
544+
.mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
545+
.map(new Func1<Long, String>() {
546+
@Override
547+
public String call(Long i) {
548+
return s + " " + i;
549+
}
550+
})).take(250);
551+
}
552+
})
553+
.share()
554+
;
555+
556+
TestSubscriber<String> ts = new TestSubscriber<String>() {
557+
@Override
558+
public void onNext(String t) {
559+
super.onNext(t);
560+
if (getOnNextEvents().size() == 250) {
561+
onCompleted();
562+
unsubscribe();
563+
}
564+
}
565+
};
566+
src.subscribe(ts);
567+
568+
ts.awaitTerminalEvent(10, TimeUnit.SECONDS);
569+
570+
System.out.println("> testIssue2654: " + ts.getOnNextEvents().size());
571+
572+
ts.assertTerminalEvent();
573+
ts.assertNoErrors();
574+
575+
Assert.assertEquals(250, ts.getOnNextEvents().size());
576+
}
533577
}

0 commit comments

Comments
 (0)