Skip to content

Commit 0de98b2

Browse files
committed
fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called
1 parent 2532484 commit 0de98b2

File tree

1 file changed

+37
-34
lines changed

1 file changed

+37
-34
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,15 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// These are the states for the status field. The transitions are
80+
// ACTIVE -> COMPLETED and ACTIVE -> ERRORED
81+
private static final byte ACTIVE = 0;
82+
private static final byte COMPLETED = 1;
83+
private static final byte ERRORED = 2;
84+
85+
// the current status of the incoming stream, possible values listed above
86+
volatile byte status = ACTIVE;
8087

8188
volatile long requested = 0;
8289
@SuppressWarnings("rawtypes")
@@ -127,7 +134,7 @@ public void onStart() {
127134

128135
@Override
129136
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
137+
if (isUnsubscribed() || status !=ACTIVE) {
131138
return;
132139
}
133140
if (!queue.offer(on.next(t))) {
@@ -139,19 +146,19 @@ public void onNext(final T t) {
139146

140147
@Override
141148
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
149+
if (isUnsubscribed() || status != ACTIVE) {
143150
return;
144151
}
145152
if (error != null) {
146153
return;
147154
}
148-
completed = true;
155+
status = COMPLETED;
149156
schedule();
150157
}
151158

152159
@Override
153160
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
161+
if (isUnsubscribed() || status != ACTIVE) {
155162
return;
156163
}
157164
if (error != null) {
@@ -161,8 +168,7 @@ public void onError(final Throwable e) {
161168
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162169
unsubscribe();
163170
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
171+
status = ERRORED;
166172
schedule();
167173
}
168174

@@ -193,39 +199,36 @@ void pollQueue() {
193199

194200
// middle:
195201
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
202+
if (status == ERRORED) {
197203
child.onError(error);
198204
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
201-
child.onCompleted();
202-
return;
203-
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
213-
return;
214-
}
215-
// nothing in queue
205+
} else if (requested == 0 && status == COMPLETED && queue.isEmpty()) {
206+
child.onCompleted();
207+
return;
208+
} else if (REQUESTED.getAndDecrement(this) != 0) {
209+
Object o = queue.poll();
210+
if (o == null) {
211+
// nothing in queue
212+
if (status == ERRORED) {
213+
child.onError(error);
214+
return;
215+
} else if (status == COMPLETED) {
216+
child.onCompleted();
217+
return;
218+
} else {
216219
REQUESTED.incrementAndGet(this);
217220
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222-
}
223221
}
224222
} else {
225-
// we hit the end ... so increment back to 0 again
226-
REQUESTED.incrementAndGet(this);
227-
break;
223+
if (!on.accept(child, o)) {
224+
// non-terminal event so let's increment count
225+
emitted++;
226+
}
228227
}
228+
} else {
229+
// we hit the end ... so increment back to 0 again
230+
REQUESTED.incrementAndGet(this);
231+
break;
229232
}
230233
}
231234
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

0 commit comments

Comments
 (0)