19
19
import static org .junit .Assert .assertEquals ;
20
20
21
21
import java .io .IOException ;
22
- import java .net .SocketException ;
23
22
import java .time .Duration ;
24
23
import java .util .ArrayList ;
25
24
import java .util .Iterator ;
26
25
import java .util .List ;
27
26
import java .util .Queue ;
28
27
import java .util .concurrent .CancellationException ;
29
28
import java .util .concurrent .ConcurrentLinkedQueue ;
30
- import java .util .concurrent .Semaphore ;
29
+ import java .util .concurrent .TimeoutException ;
31
30
import java .util .function .BiConsumer ;
32
31
import java .util .function .Consumer ;
33
32
import java .util .stream .Collectors ;
36
35
import org .mockito .Mockito ;
37
36
import org .reactivestreams .Subscription ;
38
37
import reactor .core .CoreSubscriber ;
38
+ import reactor .core .Exceptions ;
39
39
import reactor .core .Scannable ;
40
40
import reactor .core .publisher .Hooks ;
41
41
import reactor .core .publisher .Mono ;
42
42
import reactor .core .publisher .MonoProcessor ;
43
43
import reactor .core .scheduler .Schedulers ;
44
- import reactor .retry .Retry ;
45
- import reactor .retry .RetryContext ;
46
- import reactor .retry .RetryExhaustedException ;
47
44
import reactor .test .StepVerifier ;
48
45
import reactor .test .publisher .TestPublisher ;
49
46
import reactor .test .util .RaceTestUtils ;
50
47
import reactor .util .function .Tuple2 ;
51
48
import reactor .util .function .Tuples ;
49
+ import reactor .util .retry .Retry ;
52
50
53
51
public class ReconnectMonoTests {
54
52
55
- private Queue <RetryContext <?>> retries = new ConcurrentLinkedQueue <>();
56
- private Queue <Tuple2 <Object , ReconnectMono .Invalidatable >> received =
57
- new ConcurrentLinkedQueue <>();
53
+ private Queue <Retry .RetrySignal > retries = new ConcurrentLinkedQueue <>();
54
+ private Queue <Tuple2 <Object , Invalidatable >> received = new ConcurrentLinkedQueue <>();
58
55
private Queue <Object > expired = new ConcurrentLinkedQueue <>();
59
56
60
57
@ Test
@@ -407,11 +404,7 @@ public void shouldExpireValueOnRacingDisposeAndErrorWithNoBackoff() {
407
404
408
405
final ReconnectMono <String > reconnectMono =
409
406
cold .mono ()
410
- .retryWhen (
411
- Retry .anyOf (Exception .class )
412
- .retryMax (1 )
413
- .withBackoffScheduler (Schedulers .immediate ())
414
- .noBackoff ())
407
+ .retryWhen (Retry .max (1 ).filter (t -> t instanceof Exception ))
415
408
.as (source -> new ReconnectMono <>(source , onExpire (), onValue ()));
416
409
417
410
final MonoProcessor <String > processor = reconnectMono .subscribeWith (MonoProcessor .create ());
@@ -433,7 +426,7 @@ public void shouldExpireValueOnRacingDisposeAndErrorWithNoBackoff() {
433
426
.hasMessage ("ReconnectMono has already been disposed" );
434
427
} else {
435
428
Assertions .assertThat (processor .getError ())
436
- .isInstanceOf ( RetryExhaustedException . class )
429
+ .matches ( t -> Exceptions . isRetryExhausted ( t ) )
437
430
.hasCause (runtimeException );
438
431
}
439
432
@@ -772,15 +765,15 @@ public void shouldTimeoutRetryWithVirtualTime() {
772
765
() ->
773
766
Mono .<String >error (new RuntimeException ("Something went wrong" ))
774
767
.retryWhen (
775
- Retry .anyOf ( Exception . class )
776
- .exponentialBackoffWithJitter (
777
- Duration . ofSeconds ( minBackoff ), Duration .ofSeconds (maxBackoff ))
778
- .timeout (Duration .ofSeconds (timeout ) ))
768
+ Retry .backoff ( Long . MAX_VALUE , Duration . ofSeconds ( minBackoff ) )
769
+ .doAfterRetry ( onRetry ())
770
+ . maxBackoff ( Duration .ofSeconds (maxBackoff ) ))
771
+ .timeout (Duration .ofSeconds (timeout ))
779
772
.as (m -> new ReconnectMono <>(m , onExpire (), onValue ()))
780
773
.subscribeOn (Schedulers .elastic ()))
781
774
.expectSubscription ()
782
775
.thenAwait (Duration .ofSeconds (timeout ))
783
- .expectError (RetryExhaustedException .class )
776
+ .expectError (TimeoutException .class )
784
777
.verify (Duration .ofSeconds (timeout ));
785
778
786
779
Assertions .assertThat (received ).isEmpty ();
@@ -791,12 +784,11 @@ public void shouldTimeoutRetryWithVirtualTime() {
791
784
public void monoRetryNoBackoff () {
792
785
Mono <?> mono =
793
786
Mono .error (new IOException ())
794
- .retryWhen (Retry .any (). noBackoff (). retryMax ( 2 ).doOnRetry (onRetry ()))
787
+ .retryWhen (Retry .max ( 2 ).doAfterRetry (onRetry ()))
795
788
.as (m -> new ReconnectMono <>(m , onExpire (), onValue ()));
796
789
797
- StepVerifier .create (mono ).verifyError ( RetryExhaustedException . class );
790
+ StepVerifier .create (mono ).verifyErrorMatches ( Exceptions :: isRetryExhausted );
798
791
assertRetries (IOException .class , IOException .class );
799
- RetryTestUtils .assertDelays (retries , 0L , 0L );
800
792
801
793
Assertions .assertThat (received ).isEmpty ();
802
794
Assertions .assertThat (expired ).isEmpty ();
@@ -806,18 +798,16 @@ public void monoRetryNoBackoff() {
806
798
public void monoRetryFixedBackoff () {
807
799
Mono <?> mono =
808
800
Mono .error (new IOException ())
809
- .retryWhen (
810
- Retry .any ().fixedBackoff (Duration .ofMillis (500 )).retryOnce ().doOnRetry (onRetry ()))
801
+ .retryWhen (Retry .fixedDelay (1 , Duration .ofMillis (500 )).doAfterRetry (onRetry ()))
811
802
.as (m -> new ReconnectMono <>(m , onExpire (), onValue ()));
812
803
813
804
StepVerifier .withVirtualTime (() -> mono )
814
805
.expectSubscription ()
815
806
.expectNoEvent (Duration .ofMillis (300 ))
816
807
.thenAwait (Duration .ofMillis (300 ))
817
- .verifyError ( RetryExhaustedException . class );
808
+ .verifyErrorMatches ( Exceptions :: isRetryExhausted );
818
809
819
810
assertRetries (IOException .class );
820
- RetryTestUtils .assertDelays (retries , 500L );
821
811
822
812
Assertions .assertThat (received ).isEmpty ();
823
813
Assertions .assertThat (expired ).isEmpty ();
@@ -828,10 +818,9 @@ public void monoRetryExponentialBackoff() {
828
818
Mono <?> mono =
829
819
Mono .error (new IOException ())
830
820
.retryWhen (
831
- Retry .any ()
832
- .exponentialBackoff (Duration .ofMillis (100 ), Duration .ofMillis (500 ))
833
- .retryMax (4 )
834
- .doOnRetry (onRetry ()))
821
+ Retry .backoff (4 , Duration .ofMillis (100 ))
822
+ .maxBackoff (Duration .ofMillis (500 ))
823
+ .doAfterRetry (onRetry ()))
835
824
.as (m -> new ReconnectMono <>(m , onExpire (), onValue ()));
836
825
837
826
StepVerifier .withVirtualTime (() -> mono )
@@ -840,74 +829,19 @@ public void monoRetryExponentialBackoff() {
840
829
.thenAwait (Duration .ofMillis (200 ))
841
830
.thenAwait (Duration .ofMillis (400 ))
842
831
.thenAwait (Duration .ofMillis (500 ))
843
- .verifyError ( RetryExhaustedException . class );
832
+ .verifyErrorMatches ( Exceptions :: isRetryExhausted );
844
833
845
834
assertRetries (IOException .class , IOException .class , IOException .class , IOException .class );
846
- RetryTestUtils .assertDelays (retries , 100L , 200L , 400L , 500L );
847
-
848
- Assertions .assertThat (received ).isEmpty ();
849
- Assertions .assertThat (expired ).isEmpty ();
850
- }
851
-
852
- @ Test
853
- public void monoRetryRandomBackoff () {
854
- Mono <?> mono =
855
- Mono .error (new IOException ())
856
- .retryWhen (
857
- Retry .any ()
858
- .randomBackoff (Duration .ofMillis (100 ), Duration .ofMillis (2000 ))
859
- .retryMax (4 )
860
- .doOnRetry (onRetry ()))
861
- .as (m -> new ReconnectMono <>(m , onExpire (), onValue ()));
862
-
863
- StepVerifier .withVirtualTime (() -> mono )
864
- .expectSubscription ()
865
- .thenAwait (Duration .ofMillis (100 ))
866
- .thenAwait (Duration .ofMillis (2000 ))
867
- .thenAwait (Duration .ofMillis (2000 ))
868
- .thenAwait (Duration .ofMillis (2000 ))
869
- .verifyError (RetryExhaustedException .class );
870
-
871
- assertRetries (IOException .class , IOException .class , IOException .class , IOException .class );
872
- RetryTestUtils .assertRandomDelays (retries , 100 , 2000 );
873
-
874
- Assertions .assertThat (received ).isEmpty ();
875
- Assertions .assertThat (expired ).isEmpty ();
876
- }
877
-
878
- @ Test
879
- public void doOnRetry () {
880
- Semaphore semaphore = new Semaphore (0 );
881
- Retry <?> retry =
882
- Retry .any ()
883
- .retryOnce ()
884
- .fixedBackoff (Duration .ofMillis (500 ))
885
- .doOnRetry (context -> semaphore .release ());
886
-
887
- StepVerifier .withVirtualTime (
888
- () ->
889
- Mono .<Integer >error (new SocketException ())
890
- .retryWhen (retry )
891
- .as (m -> new ReconnectMono <>(m , onExpire (), onValue ())))
892
- .then (() -> semaphore .acquireUninterruptibly ())
893
- .expectNoEvent (Duration .ofMillis (400 ))
894
- .thenAwait (Duration .ofMillis (200 ))
895
- .verifyErrorMatches (e -> isRetryExhausted (e , SocketException .class ));
896
-
897
- StepVerifier .withVirtualTime (
898
- () -> Mono .error (new SocketException ()).retryWhen (retry .noBackoff ()))
899
- .then (() -> semaphore .acquireUninterruptibly ())
900
- .verifyErrorMatches (e -> isRetryExhausted (e , SocketException .class ));
901
835
902
836
Assertions .assertThat (received ).isEmpty ();
903
837
Assertions .assertThat (expired ).isEmpty ();
904
838
}
905
839
906
- Consumer <? super RetryContext <?> > onRetry () {
840
+ Consumer <Retry . RetrySignal > onRetry () {
907
841
return context -> retries .add (context );
908
842
}
909
843
910
- <T > BiConsumer <T , ReconnectMono . Invalidatable > onValue () {
844
+ <T > BiConsumer <T , Invalidatable > onValue () {
911
845
return (v , __ ) -> received .add (Tuples .of (v , __ ));
912
846
}
913
847
@@ -919,15 +853,15 @@ <T> Consumer<T> onExpire() {
919
853
private final void assertRetries (Class <? extends Throwable >... exceptions ) {
920
854
assertEquals (exceptions .length , retries .size ());
921
855
int index = 0 ;
922
- for (Iterator <RetryContext <?> > it = retries .iterator (); it .hasNext (); ) {
923
- RetryContext <?> retryContext = it .next ();
924
- assertEquals (index + 1 , retryContext .iteration ());
925
- assertEquals (exceptions [index ], retryContext .exception ().getClass ());
856
+ for (Iterator <Retry . RetrySignal > it = retries .iterator (); it .hasNext (); ) {
857
+ Retry . RetrySignal retryContext = it .next ();
858
+ assertEquals (index , retryContext .totalRetries ());
859
+ assertEquals (exceptions [index ], retryContext .failure ().getClass ());
926
860
index ++;
927
861
}
928
862
}
929
863
930
864
static boolean isRetryExhausted (Throwable e , Class <? extends Throwable > cause ) {
931
- return e instanceof RetryExhaustedException && cause .isInstance (e .getCause ());
865
+ return Exceptions . isRetryExhausted ( e ) && cause .isInstance (e .getCause ());
932
866
}
933
867
}
0 commit comments