Skip to content

Commit bb170b5

Browse files
committed
Merge pull request #2960 from davidmoten/concat-race
fix OperatorConcat race condition where request lost
2 parents 4329fed + 5db7b94 commit bb170b5

File tree

3 files changed

+61
-32
lines changed

3 files changed

+61
-32
lines changed

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Producer;
2525
import rx.Subscriber;
2626
import rx.functions.Action0;
27+
import rx.internal.producers.ProducerArbiter;
2728
import rx.observers.SerializedSubscriber;
2829
import rx.subscriptions.SerialSubscription;
2930
import rx.subscriptions.Subscriptions;
@@ -85,17 +86,19 @@ static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T
8586

8687
volatile int wip;
8788
@SuppressWarnings("rawtypes")
88-
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
89+
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");
8990

90-
// accessed by REQUESTED_UPDATER
91+
// accessed by REQUESTED
9192
private volatile long requested;
9293
@SuppressWarnings("rawtypes")
93-
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
94+
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");
95+
private final ProducerArbiter arbiter;
9496

9597
public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
9698
super(s);
9799
this.child = s;
98100
this.current = current;
101+
this.arbiter = new ProducerArbiter();
99102
this.queue = new ConcurrentLinkedQueue<Object>();
100103
add(Subscriptions.create(new Action0() {
101104
@Override
@@ -113,32 +116,27 @@ public void onStart() {
113116
}
114117

115118
private void requestFromChild(long n) {
119+
if (n <=0) return;
116120
// we track 'requested' so we know whether we should subscribe the next or not
117-
ConcatInnerSubscriber<T> actualSubscriber = currentSubscriber;
118-
if (n > 0 && BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n) == 0) {
119-
if (actualSubscriber == null && wip > 0) {
121+
long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n);
122+
arbiter.request(n);
123+
if (previous == 0) {
124+
if (currentSubscriber == null && wip > 0) {
120125
// this means we may be moving from one subscriber to another after having stopped processing
121126
// so need to kick off the subscribe via this request notification
122127
subscribeNext();
123-
// return here as we don't want to do the requestMore logic below (which would double request)
124-
return;
125128
}
126129
}
127-
128-
if (actualSubscriber != null) {
129-
// otherwise we are just passing it through to the currentSubscriber
130-
actualSubscriber.requestMore(n);
131-
}
132130
}
133131

134132
private void decrementRequested() {
135-
REQUESTED_UPDATER.decrementAndGet(this);
133+
REQUESTED.decrementAndGet(this);
136134
}
137135

138136
@Override
139137
public void onNext(Observable<? extends T> t) {
140138
queue.add(nl.next(t));
141-
if (WIP_UPDATER.getAndIncrement(this) == 0) {
139+
if (WIP.getAndIncrement(this) == 0) {
142140
subscribeNext();
143141
}
144142
}
@@ -152,14 +150,15 @@ public void onError(Throwable e) {
152150
@Override
153151
public void onCompleted() {
154152
queue.add(nl.completed());
155-
if (WIP_UPDATER.getAndIncrement(this) == 0) {
153+
if (WIP.getAndIncrement(this) == 0) {
156154
subscribeNext();
157155
}
158156
}
157+
159158

160159
void completeInner() {
161160
currentSubscriber = null;
162-
if (WIP_UPDATER.decrementAndGet(this) > 0) {
161+
if (WIP.decrementAndGet(this) > 0) {
163162
subscribeNext();
164163
}
165164
request(1);
@@ -172,7 +171,7 @@ void subscribeNext() {
172171
child.onCompleted();
173172
} else if (o != null) {
174173
Observable<? extends T> obs = nl.getValue(o);
175-
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested);
174+
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter);
176175
current.set(currentSubscriber);
177176
obs.unsafeSubscribe(currentSubscriber);
178177
}
@@ -193,39 +192,42 @@ static class ConcatInnerSubscriber<T> extends Subscriber<T> {
193192
@SuppressWarnings("unused")
194193
private volatile int once = 0;
195194
@SuppressWarnings("rawtypes")
196-
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
195+
private final static AtomicIntegerFieldUpdater<ConcatInnerSubscriber> ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once");
196+
private final ProducerArbiter arbiter;
197197

198-
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, long initialRequest) {
198+
public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, ProducerArbiter arbiter) {
199199
this.parent = parent;
200200
this.child = child;
201-
request(initialRequest);
202-
}
203-
204-
void requestMore(long n) {
205-
request(n);
201+
this.arbiter = arbiter;
206202
}
207-
203+
208204
@Override
209205
public void onNext(T t) {
210-
parent.decrementRequested();
211206
child.onNext(t);
207+
parent.decrementRequested();
208+
arbiter.produced(1);
212209
}
213210

214211
@Override
215212
public void onError(Throwable e) {
216-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
213+
if (ONCE.compareAndSet(this, 0, 1)) {
217214
// terminal error through parent so everything gets cleaned up, including this inner
218215
parent.onError(e);
219216
}
220217
}
221218

222219
@Override
223220
public void onCompleted() {
224-
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
221+
if (ONCE.compareAndSet(this, 0, 1)) {
225222
// terminal completion to parent so it continues to the next
226223
parent.completeInner();
227224
}
228225
}
226+
227+
@Override
228+
public void setProducer(Producer producer) {
229+
arbiter.setProducer(producer);
230+
}
229231

230232
};
231233
}

src/main/java/rx/internal/producers/ProducerArbiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public void produced(long n) {
9595
if (r != Long.MAX_VALUE) {
9696
long u = r - n;
9797
if (u < 0) {
98-
throw new IllegalStateException();
98+
throw new IllegalStateException("more items arrived than were requested");
9999
}
100100
requested = u;
101101
}

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,7 @@
3636
import org.mockito.InOrder;
3737

3838
import rx.Observable.OnSubscribe;
39-
import rx.Scheduler.Worker;
4039
import rx.*;
41-
import rx.functions.Action0;
4240
import rx.functions.Func1;
4341
import rx.internal.util.RxRingBuffer;
4442
import rx.observers.TestSubscriber;
@@ -795,4 +793,33 @@ public void onNext(Integer t) {
795793
assertTrue(completed.get());
796794
}
797795

796+
@Test//(timeout = 100000)
797+
public void concatMapRangeAsyncLoopIssue2876() {
798+
final long durationSeconds = 2;
799+
final long startTime = System.currentTimeMillis();
800+
for (int i = 0;; i++) {
801+
//only run this for a max of ten seconds
802+
if (System.currentTimeMillis()-startTime > TimeUnit.SECONDS.toMillis(durationSeconds))
803+
return;
804+
if (i % 1000 == 0) {
805+
System.out.println("concatMapRangeAsyncLoop > " + i);
806+
}
807+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
808+
Observable.range(0, 1000)
809+
.concatMap(new Func1<Integer, Observable<Integer>>() {
810+
@Override
811+
public Observable<Integer> call(Integer t) {
812+
return Observable.from(Arrays.asList(t));
813+
}
814+
})
815+
.observeOn(Schedulers.computation()).subscribe(ts);
816+
817+
ts.awaitTerminalEvent(2500, TimeUnit.MILLISECONDS);
818+
ts.assertTerminalEvent();
819+
ts.assertNoErrors();
820+
assertEquals(1000, ts.getOnNextEvents().size());
821+
assertEquals((Integer)999, ts.getOnNextEvents().get(999));
822+
}
823+
}
824+
798825
}

0 commit comments

Comments
 (0)