Skip to content

Commit ff44382

Browse files
committed
Use RSocketRequester.rsocketClient() to connect
Related to spring-projects/spring-framework#25332 * Delegate subscription (and therefore auto-connection) into underlying `RSocketClient` in the `ClientRSocketConnector`
1 parent 40b0031 commit ff44382

File tree

2 files changed

+24
-38
lines changed

2 files changed

+24
-38
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@
2626
import org.springframework.util.Assert;
2727
import org.springframework.util.MimeType;
2828

29-
import io.rsocket.core.RSocketConnector;
3029
import io.rsocket.transport.ClientTransport;
3130
import io.rsocket.transport.netty.client.TcpClientTransport;
3231
import io.rsocket.transport.netty.client.WebsocketClientTransport;
32+
import reactor.core.Disposable;
3333
import reactor.core.publisher.Mono;
34-
import reactor.core.publisher.Sinks;
3534

3635
/**
3736
* A client {@link AbstractRSocketConnector} extension to the RSocket connection.
@@ -40,7 +39,7 @@
4039
*
4140
* @since 5.2
4241
*
43-
* @see io.rsocket.RSocketFactory.ClientRSocketFactory
42+
* @see io.rsocket.core.RSocketConnector
4443
* @see RSocketRequester
4544
*/
4645
public class ClientRSocketConnector extends AbstractRSocketConnector {
@@ -176,27 +175,21 @@ public void setSetupData(Object setupData) {
176175
public void afterPropertiesSet() {
177176
super.afterPropertiesSet();
178177

179-
Sinks.StandaloneMonoSink<RSocketConnector> rsocketConnector = Sinks.promise();
180-
181-
RSocketRequester rsocketRequester =
182-
RSocketRequester.builder()
183-
.dataMimeType(getDataMimeType())
184-
.metadataMimeType(getMetadataMimeType())
185-
.rsocketStrategies(getRSocketStrategies())
186-
.setupData(this.setupData)
187-
.setupRoute(this.setupRoute, this.setupRouteVars)
188-
.apply((builder) -> this.setupMetadata.forEach(builder::setupMetadata))
189-
.rsocketConnector(this.connectorConfigurer)
190-
.rsocketConnector((connector) -> {
191-
connector.acceptor(this.rSocketMessageHandler.responder());
192-
rsocketConnector.success(connector);
193-
})
194-
.transport(this.clientTransport);
178+
RSocketRequester rsocketRequester = RSocketRequester.builder()
179+
.dataMimeType(getDataMimeType())
180+
.metadataMimeType(getMetadataMimeType())
181+
.rsocketStrategies(getRSocketStrategies())
182+
.setupData(this.setupData)
183+
.setupRoute(this.setupRoute, this.setupRouteVars)
184+
.rsocketConnector(this.connectorConfigurer)
185+
.rsocketConnector((connector) ->
186+
connector.acceptor(this.rSocketMessageHandler.responder()))
187+
.apply((builder) -> this.setupMetadata.forEach(builder::setupMetadata))
188+
.transport(this.clientTransport);
195189

196190
this.rsocketRequesterMono =
197-
rsocketConnector.asMono()
198-
.flatMap(rSocketConnector -> rSocketConnector.connect(this.clientTransport))
199-
.thenReturn(rsocketRequester)
191+
Mono.just(rsocketRequester)
192+
.doOnSubscribe((sub) -> rsocketRequester.rsocketClient().source().subscribe())
200193
.cache();
201194
}
202195

@@ -215,7 +208,8 @@ protected void doStart() {
215208
@Override
216209
public void destroy() {
217210
this.rsocketRequesterMono
218-
.doOnNext(RSocketRequester::dispose)
211+
.flatMap((requester) -> requester.rsocketClient().source())
212+
.doOnNext(Disposable::dispose)
219213
.subscribe();
220214
}
221215

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,8 @@
5959
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6060

6161
import io.rsocket.RSocket;
62-
import io.rsocket.core.RSocketConnector;
6362
import io.rsocket.core.RSocketServer;
6463
import io.rsocket.frame.decoder.PayloadDecoder;
65-
import io.rsocket.transport.netty.client.TcpClientTransport;
6664
import io.rsocket.transport.netty.server.CloseableChannel;
6765
import io.rsocket.transport.netty.server.TcpServerTransport;
6866
import reactor.core.Disposable;
@@ -499,21 +497,15 @@ public static class ClientConfig extends CommonConfig {
499497
@Bean(destroyMethod = "dispose")
500498
@Nullable
501499
public RSocket rsocketForServerRequests() {
502-
Sinks.StandaloneMonoSink<RSocketConnector> rsocketConnector = Sinks.promise();
503500

504-
RSocketRequester.builder()
501+
return RSocketRequester.builder()
505502
.setupRoute("clientConnect")
506-
.rsocketConnector(connector -> {
507-
connector.acceptor(
508-
RSocketMessageHandler.responder(RSocketStrategies.create(), controller()));
509-
rsocketConnector.success(connector);
510-
})
511-
.tcp("localhost", server.address().getPort());
512-
513-
return rsocketConnector.asMono()
514-
.flatMap(rSocketConnector ->
515-
rSocketConnector.connect(
516-
TcpClientTransport.create("localhost", server.address().getPort())))
503+
.rsocketConnector(connector ->
504+
connector.acceptor(
505+
RSocketMessageHandler.responder(RSocketStrategies.create(), controller())))
506+
.tcp("localhost", server.address().getPort())
507+
.rsocketClient()
508+
.source()
517509
.block();
518510
}
519511

0 commit comments

Comments
 (0)