Skip to content

Commit 6da90d5

Browse files
rstoyanchevOlegDokuka
authored andcommitted
RSocketClient and RSocketConnector Javadoc update (#856)
Signed-off-by: Rossen Stoyanchev <[email protected]> (cherry picked from commit 07cf959)
1 parent 4bf6297 commit 6da90d5

File tree

2 files changed

+87
-80
lines changed

2 files changed

+87
-80
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,69 +4,79 @@
44
import reactor.core.Disposable;
55
import reactor.core.publisher.Flux;
66
import reactor.core.publisher.Mono;
7+
import reactor.util.retry.Retry;
78

89
/**
9-
* A client-side interface to simplify interactions with the {@link
10-
* io.rsocket.core.RSocketConnector}. This interface represents logical communication over {@link
11-
* RSocket}, hiding the complexity of {@code Mono<RSocket>} resolution. Also, in opposite to {@link
12-
* RSocket} , {@link RSocketClient} supports multi-subscription on the same {@link Publisher} from
13-
* the interaction in the way of accepting input as {@link Publisher} like {@link Mono} or {@link
14-
* Flux} Despite, {@link RSocket} interface, {@link RSocketClient} does not coupled with a single
15-
* connection, hence disposal of the {@link #source()} {@link RSocket} will affect the {@link
16-
* RSocketClient} it selves. In such a case, a new request will cause automatic reconnection if
17-
* necessary.
10+
* Contract to perform RSocket requests from client to server, transparently connecting and ensuring
11+
* a single, shared connection to make requests with.
12+
*
13+
* <p>{@code RSocketClient} contains a {@code Mono<RSocket>} {@link #source() source}. It uses it to
14+
* obtain a live, shared {@link RSocket} connection on the first request and on subsequent requests
15+
* if the connection is lost. This eliminates the need to obtain a connection first, and makes it
16+
* easy to pass a single {@code RSocketClient} to use from multiple places.
17+
*
18+
* <p>Request methods of {@code RSocketClient} allow multiple subscriptions with each subscription
19+
* performing a new request. Therefore request methods accept {@code Mono<Payload>} rather than
20+
* {@code Payload} as on {@link RSocket}. By contrast, {@link RSocket} request methods cannot be
21+
* subscribed to more than once.
22+
*
23+
* <p>Use {@link io.rsocket.core.RSocketConnector RSocketConnector} to create a client:
24+
*
25+
* <pre class="code">{@code
26+
* RSocketClient client =
27+
* RSocketConnector.create()
28+
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
29+
* .dataMimeType("application/cbor")
30+
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
31+
* }</pre>
32+
*
33+
* <p>Use the {@link io.rsocket.core.RSocketConnector#reconnect(Retry) RSocketConnector#reconnect}
34+
* method to configure the retry logic to use whenever a shared {@code RSocket} connection needs to
35+
* be obtained:
36+
*
37+
* <pre class="code">{@code
38+
* RSocketClient client =
39+
* RSocketConnector.create()
40+
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
41+
* .dataMimeType("application/cbor")
42+
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
43+
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
44+
* }</pre>
1845
*
1946
* @since 1.0.1
2047
*/
2148
public interface RSocketClient extends Disposable {
2249

23-
/**
24-
* Provides access to the source {@link RSocket} used by this {@link RSocketClient}
25-
*
26-
* @return returns a {@link Mono} which returns the source {@link RSocket}
27-
*/
50+
/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
2851
Mono<RSocket> source();
2952

3053
/**
31-
* Fire and Forget interaction model of {@link RSocketClient}.
32-
*
33-
* @param payloadMono Request payload as {@link Mono}.
34-
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
35-
* handled, otherwise errors.
54+
* Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
55+
* multiple subscriptions and performs a request per subscriber.
3656
*/
3757
Mono<Void> fireAndForget(Mono<Payload> payloadMono);
3858

3959
/**
40-
* Request-Response interaction model of {@link RSocketClient}.
41-
*
42-
* @param payloadMono Request payload as {@link Mono}.
43-
* @return {@code Publisher} containing at most a single {@code Payload} representing the
44-
* response.
60+
* Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows
61+
* multiple subscriptions and performs a request per subscriber.
4562
*/
4663
Mono<Payload> requestResponse(Mono<Payload> payloadMono);
4764

4865
/**
49-
* Request-Stream interaction model of {@link RSocketClient}.
50-
*
51-
* @param payloadMono Request payload as {@link Mono}.
52-
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
66+
* Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows
67+
* multiple subscriptions and performs a request per subscriber.
5368
*/
5469
Flux<Payload> requestStream(Mono<Payload> payloadMono);
5570

5671
/**
57-
* Request-Channel interaction model of {@link RSocketClient}.
58-
*
59-
* @param payloads Stream of request payloads.
60-
* @return Stream of response payloads.
72+
* Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows
73+
* multiple subscriptions and performs a request per subscriber.
6174
*/
6275
Flux<Payload> requestChannel(Publisher<Payload> payloads);
6376

6477
/**
65-
* Metadata-Push interaction model of {@link RSocketClient}.
66-
*
67-
* @param payloadMono Request payload as {@link Mono}.
68-
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
69-
* handled, otherwise errors.
78+
* Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple
79+
* subscriptions and performs a request per subscriber.
7080
*/
7181
Mono<Void> metadataPush(Mono<Payload> payloadMono);
7282
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 40 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,23 @@
5353
/**
5454
* The main class to use to establish a connection to an RSocket server.
5555
*
56-
* <p>To connect over TCP using default settings:
56+
* <p>For using TCP using default settings:
5757
*
5858
* <pre>{@code
5959
* import io.rsocket.transport.netty.client.TcpClientTransport;
6060
*
61-
* Mono<RSocket> rocketMono =
62-
* RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000));
61+
* RSocketClient client =
62+
* RSocketConnector.createRSocketClient(TcpClientTransport.create("localhost", 7000));
6363
* }</pre>
6464
*
6565
* <p>To customize connection settings before connecting:
6666
*
6767
* <pre>{@code
68-
* Mono<RSocket> rocketMono =
68+
* RSocketClient client =
6969
* RSocketConnector.create()
7070
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
7171
* .dataMimeType("application/cbor")
72-
* .connect(TcpClientTransport.create("localhost", 7000));
72+
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
7373
* }</pre>
7474
*/
7575
public class RSocketConnector {
@@ -448,11 +448,42 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
448448
}
449449

450450
/**
451-
* The final step to connect with the transport to use as input and the resulting {@code
452-
* Mono<RSocket>} as output. Each subscriber to the returned {@code Mono} starts a new connection
453-
* if neither {@link #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled.
451+
* Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to
452+
* obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent
453+
* requests after the connection is lost.
454454
*
455-
* <p>The following transports are available (via additional RSocket Java modules):
455+
* <p>The following transports are available through additional RSocket Java modules:
456+
*
457+
* <ul>
458+
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
459+
* {@code rsocket-transport-netty}.
460+
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
461+
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
462+
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
463+
* rsocket-transport-local}
464+
* </ul>
465+
*
466+
* @param transport the transport of choice to connect with
467+
* @return a {@code RSocketClient} with not established connection. Note, connection will be
468+
* established on the first request
469+
* @since 1.0.1
470+
*/
471+
public RSocketClient toRSocketClient(ClientTransport transport) {
472+
Mono<RSocket> source = connect0(() -> transport);
473+
474+
if (retrySpec != null) {
475+
source = source.retryWhen(retrySpec);
476+
}
477+
478+
return new DefaultRSocketClient(source);
479+
}
480+
481+
/**
482+
* Connect with the given transport and obtain a live {@link RSocket} to use for making requests.
483+
* Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link
484+
* #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled.
485+
*
486+
* <p>The following transports are available through additional RSocket Java modules:
456487
*
457488
* <ul>
458489
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
@@ -492,40 +523,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
492523
});
493524
}
494525

495-
/**
496-
* The final step to connect with the transport to use as input and the resulting {@link
497-
* RSocketClient} as output.
498-
*
499-
* <p>Please note, {@link RSocketClient} does not represent a single or wired connection and will
500-
* do that depends on the demand (pending requests). Therefore, in order to ensure that connection
501-
* will be established in a case of error, {@link #reconnect(Retry) reconnect} may be enabled.
502-
*
503-
* <p>The following transports are available (via additional RSocket Java modules):
504-
*
505-
* <ul>
506-
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
507-
* {@code rsocket-transport-netty}.
508-
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
509-
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
510-
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
511-
* rsocket-transport-local}
512-
* </ul>
513-
*
514-
* @param transport the transport of choice to connect with
515-
* @return a {@code RSocketClient} with not established connection. Note, connection will be
516-
* established on the first request
517-
* @since 1.0.1
518-
*/
519-
public RSocketClient toRSocketClient(ClientTransport transport) {
520-
Mono<RSocket> source = connect0(() -> transport);
521-
522-
if (retrySpec != null) {
523-
source = source.retryWhen(retrySpec);
524-
}
525-
526-
return new DefaultRSocketClient(source);
527-
}
528-
529526
private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
530527
Mono<DuplexConnection> connectionMono =
531528
Mono.fromSupplier(transportSupplier)

0 commit comments

Comments
 (0)