Skip to content

Commit 9a5ca7e

Browse files
committed
fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called
1 parent 2532484 commit 9a5ca7e

File tree

2 files changed

+63
-40
lines changed

2 files changed

+63
-40
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// the status of the current stream
80+
volatile boolean finished = false;
8081

82+
@SuppressWarnings("unused")
8183
volatile long requested = 0;
84+
8285
@SuppressWarnings("rawtypes")
8386
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
8487

8588
@SuppressWarnings("unused")
8689
volatile long counter;
90+
8791
@SuppressWarnings("rawtypes")
8892
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8993

@@ -127,7 +131,7 @@ public void onStart() {
127131

128132
@Override
129133
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
134+
if (isUnsubscribed()) {
131135
return;
132136
}
133137
if (!queue.offer(on.next(t))) {
@@ -139,30 +143,23 @@ public void onNext(final T t) {
139143

140144
@Override
141145
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
146+
if (isUnsubscribed() || finished) {
143147
return;
144148
}
145-
if (error != null) {
146-
return;
147-
}
148-
completed = true;
149+
finished = true;
149150
schedule();
150151
}
151152

152153
@Override
153154
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
155+
if (isUnsubscribed() || finished) {
158156
return;
159157
}
160158
error = e;
161159
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162160
unsubscribe();
163-
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
161+
finished = true;
162+
// polling thread should skip any onNext still in the queue
166163
schedule();
167164
}
168165

@@ -191,41 +188,46 @@ void pollQueue() {
191188
*/
192189
counter = 1;
193190

194-
// middle:
195191
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
197-
child.onError(error);
198-
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
192+
if (finished) {
193+
// only read volatile error once
194+
Throwable err = error;
195+
if (err != null) {
196+
//even if there are onNext in the queue we eagerly notify of error
197+
child.onError(err);
198+
return;
199+
} else if (queue.isEmpty()) {
201200
child.onCompleted();
202201
return;
203202
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
203+
}
204+
if (REQUESTED.getAndDecrement(this) != 0) {
205+
Object o = queue.poll();
206+
if (o == null) {
207+
// nothing in queue (but be careful, something could be added concurrently right now)
208+
if (finished) {
209+
// only read volatile error once
210+
Throwable err = error;
211+
if (err != null) {
212+
child.onError(err);
213+
return;
214+
} else if (queue.isEmpty()) {
215+
child.onCompleted();
213216
return;
214-
}
215-
// nothing in queue
216-
REQUESTED.incrementAndGet(this);
217-
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222217
}
223218
}
224-
} else {
225-
// we hit the end ... so increment back to 0 again
226219
REQUESTED.incrementAndGet(this);
227220
break;
221+
} else {
222+
if (!on.accept(child, o)) {
223+
// non-terminal event so let's increment count
224+
emitted++;
225+
}
228226
}
227+
} else {
228+
// we hit the end ... so increment back to 0 again
229+
REQUESTED.incrementAndGet(this);
230+
break;
229231
}
230232
}
231233
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

src/perf/java/rx/operators/OperatorObserveOnPerf.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,26 @@ public void observeOnImmediate(Input input) throws InterruptedException {
6666
input.observable.observeOn(Schedulers.immediate()).subscribe(o);
6767
o.latch.await();
6868
}
69+
70+
@Benchmark
71+
public void observeOnComputationSubscribedOnComputation(Input input) throws InterruptedException {
72+
LatchedObserver<Integer> o = input.newLatchedObserver();
73+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).subscribe(o);
74+
o.latch.await();
75+
}
76+
77+
@Benchmark
78+
public void observeOnNewThreadSubscribedOnComputation(Input input) throws InterruptedException {
79+
LatchedObserver<Integer> o = input.newLatchedObserver();
80+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).subscribe(o);
81+
o.latch.await();
82+
}
83+
84+
@Benchmark
85+
public void observeOnImmediateSubscribedOnComputation(Input input) throws InterruptedException {
86+
LatchedObserver<Integer> o = input.newLatchedObserver();
87+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.immediate()).subscribe(o);
88+
o.latch.await();
89+
}
6990

7091
}

0 commit comments

Comments
 (0)