Skip to content

Commit 099da59

Browse files
committed
fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called
1 parent 2532484 commit 099da59

File tree

2 files changed

+68
-40
lines changed

2 files changed

+68
-40
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 47 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// the status of the current stream
80+
volatile boolean finished = false;
8081

82+
@SuppressWarnings("unused")
8183
volatile long requested = 0;
84+
8285
@SuppressWarnings("rawtypes")
8386
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
8487

8588
@SuppressWarnings("unused")
8689
volatile long counter;
90+
8791
@SuppressWarnings("rawtypes")
8892
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8993

@@ -127,7 +131,7 @@ public void onStart() {
127131

128132
@Override
129133
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
134+
if (isUnsubscribed()) {
131135
return;
132136
}
133137
if (!queue.offer(on.next(t))) {
@@ -139,30 +143,23 @@ public void onNext(final T t) {
139143

140144
@Override
141145
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
146+
if (isUnsubscribed() || finished) {
143147
return;
144148
}
145-
if (error != null) {
146-
return;
147-
}
148-
completed = true;
149+
finished = true;
149150
schedule();
150151
}
151152

152153
@Override
153154
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
155+
if (isUnsubscribed() || finished) {
158156
return;
159157
}
160158
error = e;
161159
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162160
unsubscribe();
163-
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
161+
finished = true;
162+
// polling thread should skip any onNext still in the queue
166163
schedule();
167164
}
168165

@@ -191,41 +188,51 @@ void pollQueue() {
191188
*/
192189
counter = 1;
193190

194-
// middle:
195191
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
197-
child.onError(error);
198-
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
192+
if (finished) {
193+
// only read volatile error once
194+
Throwable err = error;
195+
if (err != null) {
196+
// clear the queue to enable gc
197+
queue.clear();
198+
// even if there are onNext in the queue we eagerly notify of error
199+
child.onError(err);
200+
return;
201+
} else if (queue.isEmpty()) {
201202
child.onCompleted();
202203
return;
203204
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
205+
}
206+
if (REQUESTED.getAndDecrement(this) != 0) {
207+
Object o = queue.poll();
208+
if (o == null) {
209+
// nothing in queue (but be careful, something could be added concurrently right now)
210+
if (finished) {
211+
// only read volatile error once
212+
Throwable err = error;
213+
if (err != null) {
214+
// clear the queue to enable gc
215+
queue.clear();
216+
// even if there are onNext in the queue we eagerly notify of error
217+
child.onError(err);
218+
return;
219+
} else if (queue.isEmpty()) {
220+
child.onCompleted();
213221
return;
214-
}
215-
// nothing in queue
216-
REQUESTED.incrementAndGet(this);
217-
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222222
}
223223
}
224-
} else {
225-
// we hit the end ... so increment back to 0 again
226224
REQUESTED.incrementAndGet(this);
227225
break;
226+
} else {
227+
if (!on.accept(child, o)) {
228+
// non-terminal event so let's increment count
229+
emitted++;
230+
}
228231
}
232+
} else {
233+
// we hit the end ... so increment back to 0 again
234+
REQUESTED.incrementAndGet(this);
235+
break;
229236
}
230237
}
231238
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

src/perf/java/rx/operators/OperatorObserveOnPerf.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,5 +66,26 @@ public void observeOnImmediate(Input input) throws InterruptedException {
6666
input.observable.observeOn(Schedulers.immediate()).subscribe(o);
6767
o.latch.await();
6868
}
69+
70+
@Benchmark
71+
public void observeOnComputationSubscribedOnComputation(Input input) throws InterruptedException {
72+
LatchedObserver<Integer> o = input.newLatchedObserver();
73+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).subscribe(o);
74+
o.latch.await();
75+
}
76+
77+
@Benchmark
78+
public void observeOnNewThreadSubscribedOnComputation(Input input) throws InterruptedException {
79+
LatchedObserver<Integer> o = input.newLatchedObserver();
80+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).subscribe(o);
81+
o.latch.await();
82+
}
83+
84+
@Benchmark
85+
public void observeOnImmediateSubscribedOnComputation(Input input) throws InterruptedException {
86+
LatchedObserver<Integer> o = input.newLatchedObserver();
87+
input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.immediate()).subscribe(o);
88+
o.latch.await();
89+
}
6990

7091
}

0 commit comments

Comments
 (0)