|
15 | 15 | */
|
16 | 16 | package rx.internal.operators;
|
17 | 17 |
|
18 |
| -import static org.junit.Assert.assertEquals; |
19 |
| -import static org.junit.Assert.fail; |
| 18 | +import static org.junit.Assert.*; |
20 | 19 | import static org.mockito.Matchers.any;
|
21 |
| -import static org.mockito.Mockito.doThrow; |
22 |
| -import static org.mockito.Mockito.inOrder; |
23 |
| -import static org.mockito.Mockito.mock; |
24 |
| -import static org.mockito.Mockito.never; |
25 |
| -import static org.mockito.Mockito.times; |
| 20 | +import static org.mockito.Mockito.*; |
| 21 | + |
| 22 | +import java.util.concurrent.*; |
| 23 | +import java.util.concurrent.atomic.*; |
26 | 24 |
|
27 | 25 | import org.junit.Test;
|
28 |
| -import org.mockito.InOrder; |
29 |
| -import org.mockito.Mockito; |
| 26 | +import org.mockito.*; |
30 | 27 |
|
31 |
| -import rx.Observable; |
| 28 | +import rx.*; |
32 | 29 | import rx.Observable.OnSubscribe;
|
33 |
| -import rx.Observer; |
34 |
| -import rx.Subscriber; |
35 |
| -import rx.Subscription; |
36 |
| -import rx.functions.Action0; |
37 |
| -import rx.functions.Action1; |
38 |
| -import rx.functions.Func1; |
39 |
| -import rx.functions.Func2; |
| 30 | +import rx.functions.*; |
40 | 31 | import rx.internal.util.RxRingBuffer;
|
41 | 32 | import rx.observables.GroupedObservable;
|
42 | 33 | import rx.observers.TestSubscriber;
|
43 | 34 | import rx.schedulers.Schedulers;
|
44 | 35 | import rx.subjects.PublishSubject;
|
45 | 36 | import rx.subscriptions.Subscriptions;
|
46 | 37 |
|
47 |
| -import java.util.concurrent.CountDownLatch; |
48 |
| -import java.util.concurrent.TimeUnit; |
49 |
| -import java.util.concurrent.atomic.AtomicBoolean; |
50 |
| -import java.util.concurrent.atomic.AtomicInteger; |
51 |
| - |
52 | 38 | public class OperatorRetryTest {
|
53 | 39 |
|
54 | 40 | @Test
|
@@ -403,18 +389,38 @@ public static class FuncWithErrors implements Observable.OnSubscribe<String> {
|
403 | 389 | }
|
404 | 390 |
|
405 | 391 | @Override
|
406 |
| - public void call(Subscriber<? super String> o) { |
407 |
| - o.onNext("beginningEveryTime"); |
408 |
| - if (count.getAndIncrement() < numFailures) { |
409 |
| - System.out.println("FuncWithErrors @ " + count.get()); |
410 |
| - o.onError(new RuntimeException("forced failure: " + count.get())); |
411 |
| - } else { |
412 |
| - System.out.println("FuncWithErrors @ onSuccessOnly"); |
413 |
| - o.onNext("onSuccessOnly"); |
414 |
| - System.out.println("FuncWithErrors @ onCompleted"); |
415 |
| - o.onCompleted(); |
416 |
| - System.out.println("FuncWithErrors !"); |
417 |
| - } |
| 392 | + public void call(final Subscriber<? super String> o) { |
| 393 | + o.setProducer(new Producer() { |
| 394 | + final AtomicLong req = new AtomicLong(); |
| 395 | + @Override |
| 396 | + public void request(long n) { |
| 397 | + if (n == Long.MAX_VALUE) { |
| 398 | + o.onNext("beginningEveryTime"); |
| 399 | + if (count.getAndIncrement() < numFailures) { |
| 400 | + o.onError(new RuntimeException("forced failure: " + count.get())); |
| 401 | + } else { |
| 402 | + o.onNext("onSuccessOnly"); |
| 403 | + o.onCompleted(); |
| 404 | + } |
| 405 | + return; |
| 406 | + } |
| 407 | + if (n > 0 && req.getAndAdd(1) == 0) { |
| 408 | + int i = count.getAndIncrement(); |
| 409 | + if (i < numFailures) { |
| 410 | + o.onNext("beginningEveryTime"); |
| 411 | + o.onError(new RuntimeException("forced failure: " + count.get())); |
| 412 | + } else |
| 413 | + if (i == numFailures) { |
| 414 | + o.onNext("beginningEveryTime"); |
| 415 | + } else |
| 416 | + if (i > numFailures) { |
| 417 | + o.onNext("onSuccessOnly"); |
| 418 | + o.onCompleted(); |
| 419 | + } |
| 420 | + req.decrementAndGet(); |
| 421 | + } |
| 422 | + } |
| 423 | + }); |
418 | 424 | }
|
419 | 425 | }
|
420 | 426 |
|
@@ -668,26 +674,28 @@ public void testTimeoutWithRetry() {
|
668 | 674 | assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
|
669 | 675 | }
|
670 | 676 |
|
671 |
| - @Test(timeout = 3000) |
| 677 | + @Test(timeout = 10000) |
672 | 678 | public void testRetryWithBackpressure() {
|
673 |
| - @SuppressWarnings("unchecked") |
674 |
| - Observer<String> observer = mock(Observer.class); |
675 |
| - int NUM_RETRIES = RxRingBuffer.SIZE * 2; |
676 |
| - Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES)); |
677 |
| - TestSubscriber<String> ts = new TestSubscriber<String>(observer); |
678 |
| - origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts); |
679 |
| - ts.awaitTerminalEvent(); |
680 |
| - |
681 |
| - InOrder inOrder = inOrder(observer); |
682 |
| - // should show 3 attempts |
683 |
| - inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime"); |
684 |
| - // should have no errors |
685 |
| - inOrder.verify(observer, never()).onError(any(Throwable.class)); |
686 |
| - // should have a single success |
687 |
| - inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); |
688 |
| - // should have a single successful onCompleted |
689 |
| - inOrder.verify(observer, times(1)).onCompleted(); |
690 |
| - inOrder.verifyNoMoreInteractions(); |
| 679 | + for (int i = 0; i < 200; i++) { |
| 680 | + @SuppressWarnings("unchecked") |
| 681 | + Observer<String> observer = mock(Observer.class); |
| 682 | + int NUM_RETRIES = RxRingBuffer.SIZE * 2; |
| 683 | + Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES)); |
| 684 | + TestSubscriber<String> ts = new TestSubscriber<String>(observer); |
| 685 | + origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts); |
| 686 | + ts.awaitTerminalEvent(); |
| 687 | + |
| 688 | + InOrder inOrder = inOrder(observer); |
| 689 | + // should have no errors |
| 690 | + verify(observer, never()).onError(any(Throwable.class)); |
| 691 | + // should show NUM_RETRIES attempts |
| 692 | + inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime"); |
| 693 | + // should have a single success |
| 694 | + inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); |
| 695 | + // should have a single successful onCompleted |
| 696 | + inOrder.verify(observer, times(1)).onCompleted(); |
| 697 | + inOrder.verifyNoMoreInteractions(); |
| 698 | + } |
691 | 699 | }
|
692 | 700 | @Test(timeout = 3000)
|
693 | 701 | public void testIssue1900() throws InterruptedException {
|
|
0 commit comments