Skip to content

Commit 2c42c91

Browse files
committed
Direct access to requester from ClientRSConnector
Since `RSocketRequester` is now lazy load on client side there is no need to wrap it into a `Mono`. * Change `getRSocketRequester()` to a plain getter If there is a requirement to force connect to the server for receiving requests from there, the `ClientRSocketConnector.connect()` should be used
1 parent ff44382 commit 2c42c91

File tree

3 files changed

+16
-29
lines changed

3 files changed

+16
-29
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import io.rsocket.transport.ClientTransport;
3030
import io.rsocket.transport.netty.client.TcpClientTransport;
3131
import io.rsocket.transport.netty.client.WebsocketClientTransport;
32-
import reactor.core.Disposable;
33-
import reactor.core.publisher.Mono;
3432

3533
/**
3634
* A client {@link AbstractRSocketConnector} extension to the RSocket connection.
@@ -58,7 +56,7 @@ public class ClientRSocketConnector extends AbstractRSocketConnector {
5856

5957
private boolean autoConnect;
6058

61-
private Mono<RSocketRequester> rsocketRequesterMono;
59+
private RSocketRequester rsocketRequester;
6260

6361
/**
6462
* Instantiate a connector based on the {@link TcpClientTransport}.
@@ -175,7 +173,7 @@ public void setSetupData(Object setupData) {
175173
public void afterPropertiesSet() {
176174
super.afterPropertiesSet();
177175

178-
RSocketRequester rsocketRequester = RSocketRequester.builder()
176+
this.rsocketRequester = RSocketRequester.builder()
179177
.dataMimeType(getDataMimeType())
180178
.metadataMimeType(getMetadataMimeType())
181179
.rsocketStrategies(getRSocketStrategies())
@@ -186,11 +184,6 @@ public void afterPropertiesSet() {
186184
connector.acceptor(this.rSocketMessageHandler.responder()))
187185
.apply((builder) -> this.setupMetadata.forEach(builder::setupMetadata))
188186
.transport(this.clientTransport);
189-
190-
this.rsocketRequesterMono =
191-
Mono.just(rsocketRequester)
192-
.doOnSubscribe((sub) -> rsocketRequester.rsocketClient().source().subscribe())
193-
.cache();
194187
}
195188

196189
@Override
@@ -207,21 +200,18 @@ protected void doStart() {
207200

208201
@Override
209202
public void destroy() {
210-
this.rsocketRequesterMono
211-
.flatMap((requester) -> requester.rsocketClient().source())
212-
.doOnNext(Disposable::dispose)
213-
.subscribe();
203+
this.rsocketRequester.rsocketClient().dispose();
214204
}
215205

216206
/**
217207
* Perform subscription into the RSocket server for incoming requests.
218208
*/
219209
public void connect() {
220-
this.rsocketRequesterMono.subscribe();
210+
this.rsocketRequester.rsocketClient().source().subscribe();
221211
}
222212

223-
public Mono<RSocketRequester> getRSocketRequester() {
224-
return this.rsocketRequesterMono;
213+
public RSocketRequester getRSocketRequester() {
214+
return this.rsocketRequester;
225215
}
226216

227217
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
8989
private EvaluationContext evaluationContext;
9090

9191
@Nullable
92-
private Mono<RSocketRequester> rsocketRequesterMono;
92+
private RSocketRequester rsocketRequester;
9393

9494
/**
9595
* Instantiate based on the provided RSocket endpoint {@code route}
@@ -207,27 +207,24 @@ protected void doInit() {
207207
super.doInit();
208208
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
209209
if (this.clientRSocketConnector != null) {
210-
this.rsocketRequesterMono = this.clientRSocketConnector.getRSocketRequester();
210+
this.rsocketRequester = this.clientRSocketConnector.getRSocketRequester();
211211
}
212212
}
213213

214214
@Override
215215
protected Object handleRequestMessage(Message<?> requestMessage) {
216-
RSocketRequester rsocketRequester = requestMessage.getHeaders()
217-
.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, RSocketRequester.class);
218-
Mono<RSocketRequester> requesterMono;
219-
if (rsocketRequester != null) {
220-
requesterMono = Mono.just(rsocketRequester);
221-
}
222-
else {
223-
requesterMono = this.rsocketRequesterMono;
216+
RSocketRequester requester =
217+
requestMessage.getHeaders()
218+
.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, RSocketRequester.class);
219+
if (requester == null) {
220+
requester = this.rsocketRequester;
224221
}
225222

226-
Assert.notNull(requesterMono,
223+
Assert.notNull(requester,
227224
() -> "The 'RSocketRequester' must be configured via 'ClientRSocketConnector' or provided in the '" +
228225
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER + "' request message headers.");
229226

230-
return requesterMono
227+
return Mono.just(requester)
231228
.map((rSocketRequester) -> createRequestSpec(rSocketRequester, requestMessage))
232229
.map((requestSpec) -> prepareRetrieveSpec(requestSpec, requestMessage))
233230
.flatMap((retrieveSpec) -> performRetrieve(retrieveSpec, requestMessage));

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ void setupTest(TestInfo testInfo) {
9797
}
9898
else {
9999
this.clientRsocketRequester =
100-
this.clientRSocketConnector.getRSocketRequester().block(Duration.ofSeconds(10));
100+
this.clientRSocketConnector.getRSocketRequester();
101101
}
102102
}
103103

0 commit comments

Comments
 (0)