Skip to content

RSocketClient and RSocketConnector Javadoc update #856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 47 additions & 37 deletions rsocket-core/src/main/java/io/rsocket/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,79 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/**
* A client-side interface to simplify interactions with the {@link
* io.rsocket.core.RSocketConnector}. This interface represents logical communication over {@link
* RSocket}, hiding the complexity of {@code Mono<RSocket>} resolution. Also, in opposite to {@link
* RSocket} , {@link RSocketClient} supports multi-subscription on the same {@link Publisher} from
* the interaction in the way of accepting input as {@link Publisher} like {@link Mono} or {@link
* Flux} Despite, {@link RSocket} interface, {@link RSocketClient} does not coupled with a single
* connection, hence disposal of the {@link #source()} {@link RSocket} will affect the {@link
* RSocketClient} it selves. In such a case, a new request will cause automatic reconnection if
* necessary.
* Contract to perform RSocket requests from client to server, transparently connecting and ensuring
* a single, shared connection to make requests with.
*
* <p>{@code RSocketClient} contains a {@code Mono<RSocket>} {@link #source() source}. It uses it to
* obtain a live, shared {@link RSocket} connection on the first request and on subsequent requests
* if the connection is lost. This eliminates the need to obtain a connection first, and makes it
* easy to pass a single {@code RSocketClient} to use from multiple places.
*
* <p>Request methods of {@code RSocketClient} allow multiple subscriptions with each subscription
* performing a new request. Therefore request methods accept {@code Mono<Payload>} rather than
* {@code Payload} as on {@link RSocket}. By contrast, {@link RSocket} request methods cannot be
* subscribed to more than once.
*
* <p>Use {@link io.rsocket.core.RSocketConnector RSocketConnector} to create a client:
*
* <pre class="code">{@code
* RSocketClient client =
* RSocketConnector.create()
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
* .dataMimeType("application/cbor")
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
* }</pre>
*
* <p>Use the {@link io.rsocket.core.RSocketConnector#reconnect(Retry) RSocketConnector#reconnect}
* method to configure the retry logic to use whenever a shared {@code RSocket} connection needs to
* be obtained:
*
* <pre class="code">{@code
* RSocketClient client =
* RSocketConnector.create()
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
* .dataMimeType("application/cbor")
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
* }</pre>
*
* @since 1.0.1
*/
public interface RSocketClient extends Disposable {

/**
* Provides access to the source {@link RSocket} used by this {@link RSocketClient}
*
* @return returns a {@link Mono} which returns the source {@link RSocket}
*/
/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
Mono<RSocket> source();

/**
* Fire and Forget interaction model of {@link RSocketClient}.
*
* @param payloadMono Request payload as {@link Mono}.
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
* Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Mono<Void> fireAndForget(Mono<Payload> payloadMono);

/**
* Request-Response interaction model of {@link RSocketClient}.
*
* @param payloadMono Request payload as {@link Mono}.
* @return {@code Publisher} containing at most a single {@code Payload} representing the
* response.
* Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Mono<Payload> requestResponse(Mono<Payload> payloadMono);

/**
* Request-Stream interaction model of {@link RSocketClient}.
*
* @param payloadMono Request payload as {@link Mono}.
* @return {@code Publisher} containing the stream of {@code Payload}s representing the response.
* Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Flux<Payload> requestStream(Mono<Payload> payloadMono);

/**
* Request-Channel interaction model of {@link RSocketClient}.
*
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
* Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows
* multiple subscriptions and performs a request per subscriber.
*/
Flux<Payload> requestChannel(Publisher<Payload> payloads);

/**
* Metadata-Push interaction model of {@link RSocketClient}.
*
* @param payloadMono Request payload as {@link Mono}.
* @return {@code Publisher} that completes when the passed {@code payload} is successfully
* handled, otherwise errors.
* Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple
* subscriptions and performs a request per subscriber.
*/
Mono<Void> metadataPush(Mono<Payload> payloadMono);
}
83 changes: 40 additions & 43 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,23 @@
/**
* The main class to use to establish a connection to an RSocket server.
*
* <p>To connect over TCP using default settings:
* <p>For using TCP using default settings:
*
* <pre>{@code
* import io.rsocket.transport.netty.client.TcpClientTransport;
*
* Mono<RSocket> rocketMono =
* RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000));
* RSocketClient client =
* RSocketConnector.createRSocketClient(TcpClientTransport.create("localhost", 7000));
* }</pre>
*
* <p>To customize connection settings before connecting:
*
* <pre>{@code
* Mono<RSocket> rocketMono =
* RSocketClient client =
* RSocketConnector.create()
* .metadataMimeType("message/x.rsocket.composite-metadata.v0")
* .dataMimeType("application/cbor")
* .connect(TcpClientTransport.create("localhost", 7000));
* .toRSocketClient(TcpClientTransport.create("localhost", 7000));
* }</pre>
*/
public class RSocketConnector {
Expand Down Expand Up @@ -448,11 +448,42 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
}

/**
* The final step to connect with the transport to use as input and the resulting {@code
* Mono<RSocket>} as output. Each subscriber to the returned {@code Mono} starts a new connection
* if neither {@link #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled.
* Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to
* obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent
* requests after the connection is lost.
*
* <p>The following transports are available (via additional RSocket Java modules):
* <p>The following transports are available through additional RSocket Java modules:
*
* <ul>
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
* {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
* rsocket-transport-local}
* </ul>
*
* @param transport the transport of choice to connect with
* @return a {@code RSocketClient} with not established connection. Note, connection will be
* established on the first request
* @since 1.0.1
*/
public RSocketClient toRSocketClient(ClientTransport transport) {
Mono<RSocket> source = connect0(() -> transport);

if (retrySpec != null) {
source = source.retryWhen(retrySpec);
}

return new DefaultRSocketClient(source);
}

/**
* Connect with the given transport and obtain a live {@link RSocket} to use for making requests.
* Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link
* #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled.
*
* <p>The following transports are available through additional RSocket Java modules:
*
* <ul>
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
Expand Down Expand Up @@ -492,40 +523,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
});
}

/**
* The final step to connect with the transport to use as input and the resulting {@link
* RSocketClient} as output.
*
* <p>Please note, {@link RSocketClient} does not represent a single or wired connection and will
* do that depends on the demand (pending requests). Therefore, in order to ensure that connection
* will be established in a case of error, {@link #reconnect(Retry) reconnect} may be enabled.
*
* <p>The following transports are available (via additional RSocket Java modules):
*
* <ul>
* <li>{@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via
* {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.netty.client.WebsocketClientTransport
* WebsocketClientTransport} via {@code rsocket-transport-netty}.
* <li>{@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code
* rsocket-transport-local}
* </ul>
*
* @param transport the transport of choice to connect with
* @return a {@code RSocketClient} with not established connection. Note, connection will be
* established on the first request
* @since 1.0.1
*/
public RSocketClient toRSocketClient(ClientTransport transport) {
Mono<RSocket> source = connect0(() -> transport);

if (retrySpec != null) {
source = source.retryWhen(retrySpec);
}

return new DefaultRSocketClient(source);
}

private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
Mono<DuplexConnection> connectionMono =
Mono.fromSupplier(transportSupplier)
Expand Down