Skip to content

Commit f7321d2

Browse files
committed
Merge pull request #3768 from akarnokd/ObserveOnRequestBoundaryCompletionFix
1.x: observeOn - fix in-sequence termination/unsubscription
2 parents d36b626 + e98b20d commit f7321d2

File tree

2 files changed

+34
-11
lines changed

2 files changed

+34
-11
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -183,15 +183,10 @@ public void call() {
183183
// less frequently (usually after each RxRingBuffer.SIZE elements)
184184

185185
for (;;) {
186-
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
187-
return;
188-
}
189-
190186
long requestAmount = requested.get();
191-
boolean unbounded = requestAmount == Long.MAX_VALUE;
192187
long currentEmission = 0L;
193188

194-
while (requestAmount != 0L) {
189+
while (requestAmount != currentEmission) {
195190
boolean done = finished;
196191
Object v = q.poll();
197192
boolean empty = v == null;
@@ -205,14 +200,19 @@ public void call() {
205200
}
206201

207202
localChild.onNext(localOn.getValue(v));
208-
209-
requestAmount--;
210-
currentEmission--;
203+
204+
currentEmission++;
211205
emitted++;
212206
}
213207

214-
if (currentEmission != 0L && !unbounded) {
215-
requested.addAndGet(currentEmission);
208+
if (requestAmount == currentEmission) {
209+
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
210+
return;
211+
}
212+
}
213+
214+
if (currentEmission != 0L) {
215+
BackpressureUtils.produced(requested, currentEmission);
216216
}
217217

218218
missed = counter.addAndGet(-missed);

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,4 +834,27 @@ public void testErrorDelayedAsync() {
834834
ts.assertError(TestException.class);
835835
ts.assertNotCompleted();
836836
}
837+
838+
@Test
839+
public void requestExactCompletesImmediately() {
840+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
841+
842+
TestScheduler test = Schedulers.test();
843+
844+
Observable.range(1, 10).observeOn(test).subscribe(ts);
845+
846+
test.advanceTimeBy(1, TimeUnit.SECONDS);
847+
848+
ts.assertNoValues();
849+
ts.assertNoErrors();
850+
ts.assertNotCompleted();
851+
852+
ts.requestMore(10);
853+
854+
test.advanceTimeBy(1, TimeUnit.SECONDS);
855+
856+
ts.assertValueCount(10);
857+
ts.assertNoErrors();
858+
ts.assertCompleted();
859+
}
837860
}

0 commit comments

Comments
 (0)