Skip to content

Commit 9b40386

Browse files
committed
fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called
1 parent 2532484 commit 9b40386

File tree

1 file changed

+37
-41
lines changed

1 file changed

+37
-41
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,15 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// These are the states for the status field. The transitions are
80+
// ACTIVE -> COMPLETED and ACTIVE -> ERRORED
81+
private static final byte ACTIVE = 0;
82+
private static final byte COMPLETED = 1;
83+
private static final byte ERRORED = 2;
84+
85+
// the current status of the incoming stream, possible values listed above
86+
volatile byte status = ACTIVE;
8087

8188
volatile long requested = 0;
8289
@SuppressWarnings("rawtypes")
@@ -127,7 +134,7 @@ public void onStart() {
127134

128135
@Override
129136
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
137+
if (isUnsubscribed() || status !=ACTIVE) {
131138
return;
132139
}
133140
if (!queue.offer(on.next(t))) {
@@ -139,30 +146,23 @@ public void onNext(final T t) {
139146

140147
@Override
141148
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
143-
return;
144-
}
145-
if (error != null) {
149+
if (isUnsubscribed() || status != ACTIVE) {
146150
return;
147151
}
148-
completed = true;
152+
status = COMPLETED;
149153
schedule();
150154
}
151155

152156
@Override
153157
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
158+
if (isUnsubscribed() || status != ACTIVE) {
158159
return;
159160
}
160161
error = e;
161162
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162163
unsubscribe();
163164
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
165+
status = ERRORED;
166166
schedule();
167167
}
168168

@@ -191,41 +191,37 @@ void pollQueue() {
191191
*/
192192
counter = 1;
193193

194-
// middle:
195194
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
195+
if (status == ERRORED) {
197196
child.onError(error);
198197
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
201-
child.onCompleted();
202-
return;
203-
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
Object o = queue.poll();
206-
if (o == null) {
207-
if (completed) {
208-
if (failure) {
209-
child.onError(error);
210-
} else {
211-
child.onCompleted();
212-
}
213-
return;
214-
}
215-
// nothing in queue
198+
} else if (requested == 0 && status == COMPLETED && queue.isEmpty()) {
199+
child.onCompleted();
200+
return;
201+
} else if (REQUESTED.getAndDecrement(this) != 0) {
202+
Object o = queue.poll();
203+
if (o == null) {
204+
// nothing in queue
205+
if (status == ERRORED) {
206+
child.onError(error);
207+
return;
208+
} else if (status == COMPLETED) {
209+
child.onCompleted();
210+
return;
211+
} else {
216212
REQUESTED.incrementAndGet(this);
217213
break;
218-
} else {
219-
if (!on.accept(child, o)) {
220-
// non-terminal event so let's increment count
221-
emitted++;
222-
}
223214
}
224215
} else {
225-
// we hit the end ... so increment back to 0 again
226-
REQUESTED.incrementAndGet(this);
227-
break;
216+
if (!on.accept(child, o)) {
217+
// non-terminal event so let's increment count
218+
emitted++;
219+
}
228220
}
221+
} else {
222+
// we hit the end ... so increment back to 0 again
223+
REQUESTED.incrementAndGet(this);
224+
break;
229225
}
230226
}
231227
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

0 commit comments

Comments
 (0)