Skip to content

Commit 1210182

Browse files
committed
Merge pull request #3093 from akarnokd/MergeAsyncTest
Fix request != 0 checking in the scalar paths of merge()
2 parents 14f82de + 0a24f8d commit 1210182

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,8 @@ void tryEmit(InnerSubscriber<T> subscriber, T value) {
315315
if (r != 0L) {
316316
synchronized (this) {
317317
// if nobody is emitting and child has available requests
318-
if (!emitting) {
318+
r = producer.get();
319+
if (!emitting && r != 0L) {
319320
emitting = true;
320321
success = true;
321322
}
@@ -422,7 +423,8 @@ void tryEmit(T value) {
422423
if (r != 0L) {
423424
synchronized (this) {
424425
// if nobody is emitting and child has available requests
425-
if (!emitting) {
426+
r = producer.get();
427+
if (!emitting && r != 0L) {
426428
emitting = true;
427429
success = true;
428430
}

src/test/java/rx/BackpressureTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,30 @@ public void testMergeAsync() {
123123
assertTrue(c2.get() < RxRingBuffer.SIZE * 5);
124124
}
125125

126+
@Test
127+
public void testMergeAsyncThenObserveOnLoop() {
128+
for (int i = 0; i < 500; i++) {
129+
if (i % 10 == 0) {
130+
System.out.println("testMergeAsyncThenObserveOnLoop >> " + i);
131+
}
132+
// Verify there is no MissingBackpressureException
133+
int NUM = (int) (RxRingBuffer.SIZE * 4.1);
134+
AtomicInteger c1 = new AtomicInteger();
135+
AtomicInteger c2 = new AtomicInteger();
136+
137+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
138+
Observable<Integer> merged = Observable.merge(
139+
incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
140+
incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
141+
142+
merged.observeOn(Schedulers.io()).take(NUM).subscribe(ts);
143+
ts.awaitTerminalEvent();
144+
ts.assertNoErrors();
145+
System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get());
146+
assertEquals(NUM, ts.getOnNextEvents().size());
147+
}
148+
}
149+
126150
@Test
127151
public void testMergeAsyncThenObserveOn() {
128152
int NUM = (int) (RxRingBuffer.SIZE * 4.1);

0 commit comments

Comments
 (0)