Skip to content

Commit 72ab2d7

Browse files
committed
refactors to convenient Retry
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cb9e46f commit 72ab2d7

File tree

4 files changed

+18
-33
lines changed

4 files changed

+18
-33
lines changed

rsocket-core/src/main/java/io/rsocket/Invalidatable.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@
4848
import java.util.function.Consumer;
4949
import java.util.function.Function;
5050
import java.util.function.Supplier;
51-
import org.reactivestreams.Publisher;
5251
import reactor.core.Disposable;
53-
import reactor.core.publisher.Flux;
5452
import reactor.core.publisher.Mono;
53+
import reactor.util.retry.Retry;
5554

5655
/** Factory for creating RSocket clients and servers. */
5756
public class RSocketFactory {
@@ -133,7 +132,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
133132
private boolean leaseEnabled;
134133
private Supplier<Leases<?>> leasesSupplier = Leases::new;
135134
private boolean reconnectEnabled;
136-
private Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory;
135+
private Retry retrySpec;
137136

138137
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
139138

@@ -355,9 +354,8 @@ public ClientRSocketFactory reconnect() {
355354
* @param whenFactory a retry factory applied for {@link Mono.retryWhen(whenFactory)}
356355
* @return a shared instance of {@code Mono<RSocket>}.
357356
*/
358-
public ClientRSocketFactory reconnect(
359-
Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
360-
this.whenFactory = Objects.requireNonNull(whenFactory);
357+
public ClientRSocketFactory reconnect(Retry retrySpec) {
358+
this.retrySpec = Objects.requireNonNull(retrySpec);
361359
this.reconnectEnabled = true;
362360
return this;
363361
}
@@ -529,7 +527,7 @@ public Mono<RSocket> start() {
529527
source -> {
530528
if (reconnectEnabled) {
531529
return new ReconnectMono<>(
532-
whenFactory == null ? source : source.retryWhen(whenFactory),
530+
retrySpec == null ? source : source.retryWhen(retrySpec),
533531
Disposable::dispose,
534532
INVALIDATE_FUNCTION);
535533
} else {
@@ -841,9 +839,11 @@ public Mono<T> start() {
841839

842840
@Override
843841
public Mono<T> get() {
844-
return transportServer
845-
.get()
846-
.start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu)
842+
return Mono.fromSupplier(transportServer)
843+
.flatMap(
844+
transport ->
845+
transport.start(
846+
duplexConnection -> acceptor(serverSetup, duplexConnection), mtu))
847847
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
848848
}
849849
});

rsocket-core/src/main/java/io/rsocket/ReconnectMono.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,3 +470,8 @@ public Object scanUnsafe(Attr key) {
470470
}
471471
}
472472
}
473+
474+
interface Invalidatable {
475+
476+
void invalidate();
477+
}

rsocket-core/src/test/java/io/rsocket/ReconnectMonoTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
public class ReconnectMonoTests {
5454

5555
private Queue<RetryContext<?>> retries = new ConcurrentLinkedQueue<>();
56-
private Queue<Tuple2<Object, Invalidatable>> received = new ConcurrentLinkedQueue<>();
56+
private Queue<Tuple2<Object, ReconnectMono.Invalidatable>> received =
57+
new ConcurrentLinkedQueue<>();
5758
private Queue<Object> expired = new ConcurrentLinkedQueue<>();
5859

5960
@Test
@@ -906,7 +907,7 @@ Consumer<? super RetryContext<?>> onRetry() {
906907
return context -> retries.add(context);
907908
}
908909

909-
<T> BiConsumer<T, Invalidatable> onValue() {
910+
<T> BiConsumer<T, ReconnectMono.Invalidatable> onValue() {
910911
return (v, __) -> received.add(Tuples.of(v, __));
911912
}
912913

0 commit comments

Comments
 (0)