Skip to content

Commit 24754cf

Browse files
committed
include observe on fix from ReactiveX#2929
1 parent a7a3dbc commit 24754cf

File tree

1 file changed

+47
-41
lines changed

1 file changed

+47
-41
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 47 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
7575
final NotificationLite<T> on = NotificationLite.instance();
7676

7777
final Queue<Object> queue;
78-
volatile boolean completed = false;
79-
volatile boolean failure = false;
78+
79+
// the status of the current stream
80+
volatile boolean finished = false;
8081

82+
@SuppressWarnings("unused")
8183
volatile long requested = 0;
84+
8285
@SuppressWarnings("rawtypes")
8386
static final AtomicLongFieldUpdater<ObserveOnSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested");
8487

8588
@SuppressWarnings("unused")
8689
volatile long counter;
90+
8791
@SuppressWarnings("rawtypes")
8892
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
8993

@@ -127,7 +131,7 @@ public void onStart() {
127131

128132
@Override
129133
public void onNext(final T t) {
130-
if (isUnsubscribed() || completed) {
134+
if (isUnsubscribed()) {
131135
return;
132136
}
133137
if (!queue.offer(on.next(t))) {
@@ -139,30 +143,23 @@ public void onNext(final T t) {
139143

140144
@Override
141145
public void onCompleted() {
142-
if (isUnsubscribed() || completed) {
146+
if (isUnsubscribed() || finished) {
143147
return;
144148
}
145-
if (error != null) {
146-
return;
147-
}
148-
completed = true;
149+
finished = true;
149150
schedule();
150151
}
151152

152153
@Override
153154
public void onError(final Throwable e) {
154-
if (isUnsubscribed() || completed) {
155-
return;
156-
}
157-
if (error != null) {
155+
if (isUnsubscribed() || finished) {
158156
return;
159157
}
160158
error = e;
161159
// unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event
162160
unsubscribe();
163-
// mark failure so the polling thread will skip onNext still in the queue
164-
completed = true;
165-
failure = true;
161+
finished = true;
162+
// polling thread should skip any onNext still in the queue
166163
schedule();
167164
}
168165

@@ -191,42 +188,51 @@ void pollQueue() {
191188
*/
192189
counter = 1;
193190

194-
// middle:
195191
while (!scheduledUnsubscribe.isUnsubscribed()) {
196-
if (failure) {
197-
child.onError(error);
198-
return;
199-
} else {
200-
if (requested == 0 && completed && queue.isEmpty()) {
192+
if (finished) {
193+
// only read volatile error once
194+
Throwable err = error;
195+
if (err != null) {
196+
// clear the queue to enable gc
197+
queue.clear();
198+
// even if there are onNext in the queue we eagerly notify of error
199+
child.onError(err);
200+
return;
201+
} else if (queue.isEmpty()) {
201202
child.onCompleted();
202203
return;
203204
}
204-
if (REQUESTED.getAndDecrement(this) != 0) {
205-
boolean c = completed;
206-
Object o = queue.poll();
207-
if (o == null) {
208-
if (c) {
209-
if (failure) {
210-
child.onError(error);
211-
} else {
212-
child.onCompleted();
213-
}
205+
}
206+
if (REQUESTED.getAndDecrement(this) != 0) {
207+
Object o = queue.poll();
208+
if (o == null) {
209+
// nothing in queue (but be careful, something could be added concurrently right now)
210+
if (finished) {
211+
// only read volatile error once
212+
Throwable err = error;
213+
if (err != null) {
214+
// clear the queue to enable gc
215+
queue.clear();
216+
// even if there are onNext in the queue we eagerly notify of error
217+
child.onError(err);
218+
return;
219+
} else if (queue.isEmpty()) {
220+
child.onCompleted();
214221
return;
215-
}
216-
// nothing in queue
217-
REQUESTED.incrementAndGet(this);
218-
break;
219-
} else {
220-
if (!on.accept(child, o)) {
221-
// non-terminal event so let's increment count
222-
emitted++;
223222
}
224223
}
225-
} else {
226-
// we hit the end ... so increment back to 0 again
227224
REQUESTED.incrementAndGet(this);
228225
break;
226+
} else {
227+
if (!on.accept(child, o)) {
228+
// non-terminal event so let's increment count
229+
emitted++;
230+
}
229231
}
232+
} else {
233+
// we hit the end ... so increment back to 0 again
234+
REQUESTED.incrementAndGet(this);
235+
break;
230236
}
231237
}
232238
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);

0 commit comments

Comments
 (0)