Skip to content

Commit c7642b3

Browse files
committed
provides a reconnectable Mono<RSocket> feature
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2f71f73 commit c7642b3

File tree

7 files changed

+1642
-1
lines changed

7 files changed

+1642
-1
lines changed

rsocket-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dependencies {
4343
// TODO: Remove after JUnit5 migration
4444
testCompileOnly 'junit:junit'
4545
testImplementation 'org.hamcrest:hamcrest-library'
46+
testImplementation 'io.projectreactor.addons:reactor-extra'
4647
testRuntimeOnly 'org.junit.vintage:junit-vintage-engine'
4748
}
4849

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket;
17+
18+
interface Invalidate {
19+
20+
void expire();
21+
}

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
import java.util.function.Consumer;
4848
import java.util.function.Function;
4949
import java.util.function.Supplier;
50+
import org.reactivestreams.Publisher;
51+
import reactor.core.Disposable;
52+
import reactor.core.publisher.Flux;
5053
import reactor.core.publisher.Mono;
5154

5255
/** Factory for creating RSocket clients and servers. */
@@ -95,6 +98,9 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9598
public static class ClientRSocketFactory implements ClientTransportAcceptor {
9699
private static final String CLIENT_TAG = "client";
97100

101+
private static final Function<Flux<Throwable>, ? extends Publisher<?>> FAIL_WHEN_FACTORY =
102+
f -> f.concatMap(Mono::error);
103+
98104
private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(new AbstractRSocket() {});
99105

100106
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
@@ -125,6 +131,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
125131
private boolean multiSubscriberRequester = true;
126132
private boolean leaseEnabled;
127133
private Supplier<Leases<?>> leasesSupplier = Leases::new;
134+
private boolean reconnectEnabled;
135+
private Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory = FAIL_WHEN_FACTORY;
128136

129137
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
130138

@@ -230,6 +238,129 @@ public ClientRSocketFactory singleSubscriberRequester() {
230238
return this;
231239
}
232240

241+
/**
242+
* Enables a reconnectable, shared instance of {@code Mono<RSocket>} so every subscriber will
243+
* observe the same RSocket instance up on connection establishment. <br>
244+
* For example:
245+
*
246+
* <pre>{@code
247+
* Mono<RSocket> sharedRSocketMono =
248+
* RSocketFactory
249+
* .connect()
250+
* .singleSubscriberRequester()
251+
* .reconnect()
252+
* .transport(transport)
253+
* .start();
254+
*
255+
* RSocket r1 = sharedRSocketMono.block();
256+
* RSocket r2 = sharedRSocketMono.block();
257+
*
258+
* assert r1 == r2;
259+
*
260+
* }</pre>
261+
*
262+
* Apart of the shared behavior, once the previous connection has been expired (e.g. connection
263+
* has been disposed), the same instance of {@code Mono<RSocket>} reestablish connection without
264+
* any extra effort. <br>
265+
* For example:
266+
*
267+
* <pre>{@code
268+
* Mono<RSocket> sharedRSocketMono =
269+
* RSocketFactory
270+
* .connect()
271+
* .singleSubscriberRequester()
272+
* .reconnect()
273+
* .transport(transport)
274+
* .start();
275+
*
276+
* RSocket r1 = sharedRSocketMono.block();
277+
* RSocket r2 = sharedRSocketMono.block();
278+
*
279+
* assert r1 == r2;
280+
*
281+
* r1.dispose()
282+
*
283+
* assert r2.isDisposed()
284+
*
285+
* RSocket r3 = sharedRSocketMono.block();
286+
* RSocket r4 = sharedRSocketMono.block();
287+
*
288+
*
289+
* assert r1 != r3;
290+
* assert r4 == r3;
291+
*
292+
* }</pre>
293+
*
294+
* @return a shared instance of {@code Mono<RSocket>}.
295+
*/
296+
public ClientRSocketFactory reconnect() {
297+
this.reconnectEnabled = true;
298+
return this;
299+
}
300+
301+
/**
302+
* Enables a reconnectable, shared instance of {@code Mono<RSocket>} so every subscriber will
303+
* observe the same RSocket instance up on connection establishment. <br>
304+
* For example:
305+
*
306+
* <pre>{@code
307+
* Mono<RSocket> sharedRSocketMono =
308+
* RSocketFactory
309+
* .connect()
310+
* .singleSubscriberRequester()
311+
* .reconnect()
312+
* .transport(transport)
313+
* .start();
314+
*
315+
* RSocket r1 = sharedRSocketMono.block();
316+
* RSocket r2 = sharedRSocketMono.block();
317+
*
318+
* assert r1 == r2;
319+
*
320+
* }</pre>
321+
*
322+
* Apart of the shared behavior, once the previous connection has been expired (e.g. connection
323+
* has been disposed), the same instance of {@code Mono<RSocket>} reestablish connection without
324+
* any extra effort. <br>
325+
* For example:
326+
*
327+
* <pre>{@code
328+
* Mono<RSocket> sharedRSocketMono =
329+
* RSocketFactory
330+
* .connect()
331+
* .singleSubscriberRequester()
332+
* .reconnect()
333+
* .transport(transport)
334+
* .start();
335+
*
336+
* RSocket r1 = sharedRSocketMono.block();
337+
* RSocket r2 = sharedRSocketMono.block();
338+
*
339+
* assert r1 == r2;
340+
*
341+
* r1.dispose()
342+
*
343+
* assert r2.isDisposed()
344+
*
345+
* RSocket r3 = sharedRSocketMono.block();
346+
* RSocket r4 = sharedRSocketMono.block();
347+
*
348+
*
349+
* assert r1 != r3;
350+
* assert r4 == r3;
351+
*
352+
* }</pre>
353+
*
354+
* @param whenFactory a retry factory applied for {@link Mono.retryWhen(whenFactory)}
355+
* @return a shared instance of {@code Mono<RSocket>}.
356+
*/
357+
public ClientRSocketFactory reconnect(
358+
Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
359+
this.whenFactory = Objects.requireNonNull(whenFactory);
360+
this.reconnectEnabled = true;
361+
return this;
362+
}
363+
233364
public ClientRSocketFactory resume() {
234365
this.resumeEnabled = true;
235366
return this;
@@ -392,6 +523,17 @@ public Mono<RSocket> start() {
392523
.sendOne(setupFrame)
393524
.thenReturn(wrappedRSocketRequester);
394525
});
526+
})
527+
.as(
528+
source -> {
529+
if (reconnectEnabled) {
530+
return new ReconnectMono<>(
531+
source.retryWhen(whenFactory),
532+
Disposable::dispose,
533+
(r, i) -> r.onClose().subscribe(null, null, i::expire));
534+
} else {
535+
return source;
536+
}
395537
});
396538
}
397539

@@ -422,7 +564,7 @@ private ClientSetup clientSetup(DuplexConnection startConnection) {
422564
}
423565

424566
private Mono<DuplexConnection> newConnection() {
425-
return transportClient.get().connect(mtu);
567+
return Mono.fromSupplier(transportClient).flatMap(t -> t.connect(mtu));
426568
}
427569
}
428570
}

0 commit comments

Comments
 (0)