Skip to content

Commit f46d7f9

Browse files
Merge pull request ReactiveX#1912 from akarnokd/RetryWithBackpressureFix
Fixed retry without backpressure & test function to support bp.
2 parents 43456c5 + 158d9d9 commit f46d7f9

File tree

2 files changed

+69
-56
lines changed

2 files changed

+69
-56
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,19 @@ public void onError(Throwable e) {
225225

226226
@Override
227227
public void onNext(T v) {
228-
consumerCapacity.decrementAndGet();
228+
if (consumerCapacity.get() != Long.MAX_VALUE) {
229+
consumerCapacity.decrementAndGet();
230+
}
229231
child.onNext(v);
230232
}
231233

232234
@Override
233235
public void setProducer(Producer producer) {
234236
currentProducer.set(producer);
235-
producer.request(consumerCapacity.get());
237+
long c = consumerCapacity.get();
238+
if (c > 0) {
239+
producer.request(c);
240+
}
236241
}
237242
};
238243
// new subscription each time so if it unsubscribes itself it does not prevent retries
@@ -321,7 +326,7 @@ public void request(final long n) {
321326
long c = consumerCapacity.getAndAdd(n);
322327
Producer producer = currentProducer.get();
323328
if (producer != null) {
324-
producer.request(c + n);
329+
producer.request(n);
325330
} else
326331
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
327332
worker.schedule(subscribeToSource);

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 61 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -15,40 +15,26 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.fail;
18+
import static org.junit.Assert.*;
2019
import static org.mockito.Matchers.any;
21-
import static org.mockito.Mockito.doThrow;
22-
import static org.mockito.Mockito.inOrder;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.*;
2624

2725
import org.junit.Test;
28-
import org.mockito.InOrder;
29-
import org.mockito.Mockito;
26+
import org.mockito.*;
3027

31-
import rx.Observable;
28+
import rx.*;
3229
import rx.Observable.OnSubscribe;
33-
import rx.Observer;
34-
import rx.Subscriber;
35-
import rx.Subscription;
36-
import rx.functions.Action0;
37-
import rx.functions.Action1;
38-
import rx.functions.Func1;
39-
import rx.functions.Func2;
30+
import rx.functions.*;
4031
import rx.internal.util.RxRingBuffer;
4132
import rx.observables.GroupedObservable;
4233
import rx.observers.TestSubscriber;
4334
import rx.schedulers.Schedulers;
4435
import rx.subjects.PublishSubject;
4536
import rx.subscriptions.Subscriptions;
4637

47-
import java.util.concurrent.CountDownLatch;
48-
import java.util.concurrent.TimeUnit;
49-
import java.util.concurrent.atomic.AtomicBoolean;
50-
import java.util.concurrent.atomic.AtomicInteger;
51-
5238
public class OperatorRetryTest {
5339

5440
@Test
@@ -403,18 +389,38 @@ public static class FuncWithErrors implements Observable.OnSubscribe<String> {
403389
}
404390

405391
@Override
406-
public void call(Subscriber<? super String> o) {
407-
o.onNext("beginningEveryTime");
408-
if (count.getAndIncrement() < numFailures) {
409-
System.out.println("FuncWithErrors @ " + count.get());
410-
o.onError(new RuntimeException("forced failure: " + count.get()));
411-
} else {
412-
System.out.println("FuncWithErrors @ onSuccessOnly");
413-
o.onNext("onSuccessOnly");
414-
System.out.println("FuncWithErrors @ onCompleted");
415-
o.onCompleted();
416-
System.out.println("FuncWithErrors !");
417-
}
392+
public void call(final Subscriber<? super String> o) {
393+
o.setProducer(new Producer() {
394+
final AtomicLong req = new AtomicLong();
395+
@Override
396+
public void request(long n) {
397+
if (n == Long.MAX_VALUE) {
398+
o.onNext("beginningEveryTime");
399+
if (count.getAndIncrement() < numFailures) {
400+
o.onError(new RuntimeException("forced failure: " + count.get()));
401+
} else {
402+
o.onNext("onSuccessOnly");
403+
o.onCompleted();
404+
}
405+
return;
406+
}
407+
if (n > 0 && req.getAndAdd(1) == 0) {
408+
int i = count.getAndIncrement();
409+
if (i < numFailures) {
410+
o.onNext("beginningEveryTime");
411+
o.onError(new RuntimeException("forced failure: " + count.get()));
412+
} else
413+
if (i == numFailures) {
414+
o.onNext("beginningEveryTime");
415+
} else
416+
if (i > numFailures) {
417+
o.onNext("onSuccessOnly");
418+
o.onCompleted();
419+
}
420+
req.decrementAndGet();
421+
}
422+
}
423+
});
418424
}
419425
}
420426

@@ -668,26 +674,28 @@ public void testTimeoutWithRetry() {
668674
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
669675
}
670676

671-
@Test(timeout = 3000)
677+
@Test(timeout = 10000)
672678
public void testRetryWithBackpressure() {
673-
@SuppressWarnings("unchecked")
674-
Observer<String> observer = mock(Observer.class);
675-
int NUM_RETRIES = RxRingBuffer.SIZE * 2;
676-
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
677-
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
678-
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
679-
ts.awaitTerminalEvent();
680-
681-
InOrder inOrder = inOrder(observer);
682-
// should show 3 attempts
683-
inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime");
684-
// should have no errors
685-
inOrder.verify(observer, never()).onError(any(Throwable.class));
686-
// should have a single success
687-
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
688-
// should have a single successful onCompleted
689-
inOrder.verify(observer, times(1)).onCompleted();
690-
inOrder.verifyNoMoreInteractions();
679+
for (int i = 0; i < 200; i++) {
680+
@SuppressWarnings("unchecked")
681+
Observer<String> observer = mock(Observer.class);
682+
int NUM_RETRIES = RxRingBuffer.SIZE * 2;
683+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
684+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
685+
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
686+
ts.awaitTerminalEvent();
687+
688+
InOrder inOrder = inOrder(observer);
689+
// should have no errors
690+
verify(observer, never()).onError(any(Throwable.class));
691+
// should show NUM_RETRIES attempts
692+
inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime");
693+
// should have a single success
694+
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
695+
// should have a single successful onCompleted
696+
inOrder.verify(observer, times(1)).onCompleted();
697+
inOrder.verifyNoMoreInteractions();
698+
}
691699
}
692700
@Test(timeout = 3000)
693701
public void testIssue1900() throws InterruptedException {

0 commit comments

Comments
 (0)