@@ -101,8 +101,6 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
101
101
102
102
private static final BiConsumer <RSocket , Invalidatable > INVALIDATE_FUNCTION =
103
103
(r , i ) -> r .onClose ().subscribe (null , null , i ::invalidate );
104
- private static final Function <Flux <Throwable >, ? extends Publisher <?>> FAIL_WHEN_FACTORY =
105
- f -> f .concatMap (Mono ::error );
106
104
107
105
private SocketAcceptor acceptor = (setup , sendingSocket ) -> Mono .just (new AbstractRSocket () {});
108
106
@@ -135,7 +133,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
135
133
private boolean leaseEnabled ;
136
134
private Supplier <Leases <?>> leasesSupplier = Leases ::new ;
137
135
private boolean reconnectEnabled ;
138
- private Function <Flux <Throwable >, ? extends Publisher <?>> whenFactory = FAIL_WHEN_FACTORY ;
136
+ private Function <Flux <Throwable >, ? extends Publisher <?>> whenFactory ;
139
137
140
138
private ByteBufAllocator allocator = ByteBufAllocator .DEFAULT ;
141
139
@@ -531,7 +529,9 @@ public Mono<RSocket> start() {
531
529
source -> {
532
530
if (reconnectEnabled ) {
533
531
return new ReconnectMono <>(
534
- source .retryWhen (whenFactory ), Disposable ::dispose , INVALIDATE_FUNCTION );
532
+ whenFactory == null ? source : source .retryWhen (whenFactory ),
533
+ Disposable ::dispose ,
534
+ INVALIDATE_FUNCTION );
535
535
} else {
536
536
return source ;
537
537
}
0 commit comments