Skip to content

Commit 5153f1a

Browse files
committed
use new pollQueue from @akarnokd
1 parent c627e6c commit 5153f1a

File tree

1 file changed

+20
-40
lines changed

1 file changed

+20
-40
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -182,62 +182,42 @@ protected void schedule() {
182182
void pollQueue() {
183183
int emitted = 0;
184184
do {
185-
/*
186-
* Set to 1 otherwise it could have grown very large while in the last poll loop
187-
* and then we can end up looping all those times again here before exiting even once we've drained
188-
*/
189185
counter = 1;
190-
191-
while (!scheduledUnsubscribe.isUnsubscribed()) {
186+
long produced = 0;
187+
long r = requested;
188+
while (!child.isUnsubscribed()) {
189+
Throwable error;
192190
if (finished) {
193-
// only read volatile error once
194-
Throwable err = error;
195-
if (err != null) {
196-
// clear the queue to enable gc
191+
if ((error = this.error) != null) {
192+
// errors shortcut the queue so
193+
// release the elements in the queue for gc
197194
queue.clear();
198-
// even if there are onNext in the queue we eagerly notify of error
199-
child.onError(err);
195+
child.onError(error);
200196
return;
201-
} else if (queue.isEmpty()) {
197+
} else
198+
if (queue.isEmpty()) {
202199
child.onCompleted();
203200
return;
204201
}
205202
}
206-
if (REQUESTED.getAndDecrement(this) != 0) {
203+
if (r > 0) {
207204
Object o = queue.poll();
208-
if (o == null) {
209-
// nothing in queue (but be careful, something could be added concurrently right now)
210-
if (finished) {
211-
// only read volatile error once
212-
Throwable err = error;
213-
if (err != null) {
214-
// clear the queue to enable gc
215-
queue.clear();
216-
// even if there are onNext in the queue we eagerly notify of error
217-
child.onError(err);
218-
return;
219-
} else if (queue.isEmpty()) {
220-
child.onCompleted();
221-
return;
222-
}
223-
}
224-
BackpressureUtils.getAndAddRequest(REQUESTED, this, 1);
225-
break;
205+
if (o != null) {
206+
child.onNext(on.getValue(o));
207+
r--;
208+
emitted++;
209+
produced++;
226210
} else {
227-
if (!on.accept(child, o)) {
228-
// non-terminal event so let's increment count
229-
emitted++;
230-
}
211+
break;
231212
}
232213
} else {
233-
// we hit the end ... so increment back to 0 again
234-
BackpressureUtils.getAndAddRequest(REQUESTED, this, 1);
235214
break;
236215
}
237216
}
217+
if (produced > 0) {
218+
REQUESTED.addAndGet(this, -produced);
219+
}
238220
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
239-
240-
// request the number of items that we emitted in this poll loop
241221
if (emitted > 0) {
242222
request(emitted);
243223
}

0 commit comments

Comments
 (0)