Skip to content

Commit 96ebf27

Browse files
rstoyanchevOlegDokuka
authored andcommitted
Add option to provide Mono<Payload> for the SETUP payload
Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent e81cb69 commit 96ebf27

File tree

2 files changed

+85
-9
lines changed

2 files changed

+85
-9
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import reactor.core.publisher.Mono;
4848
import reactor.core.scheduler.Schedulers;
4949
import reactor.util.annotation.Nullable;
50+
import reactor.util.function.Tuples;
5051
import reactor.util.retry.Retry;
5152

5253
/**
@@ -77,7 +78,7 @@ public class RSocketConnector {
7778
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
7879
(r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);
7980

80-
private Payload setupPayload = EmptyPayload.INSTANCE;
81+
private Mono<Payload> setupPayloadMono = Mono.empty();
8182
private String metadataMimeType = "application/binary";
8283
private String dataMimeType = "application/binary";
8384
private Duration keepAliveInterval = Duration.ofSeconds(20);
@@ -118,22 +119,38 @@ public static Mono<RSocket> connectWith(ClientTransport transport) {
118119
}
119120

120121
/**
121-
* Provide a {@code Payload} with data and/or metadata for the initial {@code SETUP} frame. Data
122-
* and metadata should be formatted according to the MIME types specified via {@link
122+
* Provide a {@code Mono} from which to obtain the {@code Payload} for the initial SETUP frame.
123+
* Data and metadata should be formatted according to the MIME types specified via {@link
123124
* #dataMimeType(String)} and {@link #metadataMimeType(String)}.
124125
*
125-
* @param payload the payload containing data and/or metadata for the {@code SETUP} frame. Note,
126-
* if the instance of the given payload is not a {@link DefaultPayload}, its content will be
127-
* copied
126+
* @param setupPayloadMono the payload with data and/or metadata for the {@code SETUP} frame.
127+
* @return the same instance for method chaining
128+
* @since 1.0.2
129+
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#frame-setup">SETUP
130+
* Frame</a>
131+
*/
132+
public RSocketConnector setupPayload(Mono<Payload> setupPayloadMono) {
133+
this.setupPayloadMono = setupPayloadMono;
134+
return this;
135+
}
136+
137+
/**
138+
* Variant of {@link #setupPayload(Mono)} that accepts a {@code Payload} instance.
139+
*
140+
* <p>Note: if the given payload is {@link io.rsocket.util.ByteBufPayload}, it is copied to a
141+
* {@link DefaultPayload} and released immediately. This ensures it can re-used to obtain a
142+
* connection more than once.
143+
*
144+
* @param payload the payload with data and/or metadata for the {@code SETUP} frame.
128145
* @return the same instance for method chaining
129146
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#frame-setup">SETUP
130147
* Frame</a>
131148
*/
132149
public RSocketConnector setupPayload(Payload payload) {
133150
if (payload instanceof DefaultPayload) {
134-
this.setupPayload = payload;
151+
this.setupPayloadMono = Mono.just(payload);
135152
} else {
136-
this.setupPayload = DefaultPayload.create(Objects.requireNonNull(payload));
153+
this.setupPayloadMono = Mono.just(DefaultPayload.create(Objects.requireNonNull(payload)));
137154
payload.release();
138155
}
139156
return this;
@@ -479,9 +496,20 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
479496
mtu > 0
480497
? new FragmentationDuplexConnection(connection, mtu, "client")
481498
: new ReassemblyDuplexConnection(connection));
499+
482500
return connectionMono
483501
.flatMap(
484-
connection -> {
502+
connection ->
503+
setupPayloadMono
504+
.defaultIfEmpty(EmptyPayload.INSTANCE)
505+
.map(setupPayload -> Tuples.of(connection, setupPayload))
506+
.doOnError(ex -> connection.dispose())
507+
.doOnCancel(connection::dispose))
508+
.flatMap(
509+
tuple -> {
510+
DuplexConnection connection = tuple.getT1();
511+
Payload setupPayload = tuple.getT2();
512+
485513
ByteBuf resumeToken;
486514
KeepAliveHandler keepAliveHandler;
487515
DuplexConnection wrappedConnection;

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import io.rsocket.test.util.TestClientTransport;
1010
import io.rsocket.util.ByteBufPayload;
1111
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
1214
import org.assertj.core.api.Assertions;
1315
import org.junit.jupiter.api.Test;
1416
import reactor.core.publisher.Mono;
@@ -99,4 +101,50 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
99101
.allMatch(ReferenceCounted::release);
100102
Assertions.assertThat(setupPayload.refCnt()).isZero();
101103
}
104+
105+
@Test
106+
public void ensuresThatSetupPayloadProvidedAsMonoIsReleased() {
107+
List<Payload> saved = new ArrayList<>();
108+
Mono<Payload> setupPayloadMono =
109+
Mono.create(
110+
sink -> {
111+
Payload payload = ByteBufPayload.create("TestData", "TestMetadata");
112+
saved.add(payload);
113+
sink.success(payload);
114+
});
115+
116+
TestClientTransport testClientTransport = new TestClientTransport();
117+
Mono<RSocket> connectionMono =
118+
RSocketConnector.create().setupPayload(setupPayloadMono).connect(testClientTransport);
119+
120+
connectionMono
121+
.as(StepVerifier::create)
122+
.expectNextCount(1)
123+
.expectComplete()
124+
.verify(Duration.ofMillis(100));
125+
126+
connectionMono
127+
.as(StepVerifier::create)
128+
.expectNextCount(1)
129+
.expectComplete()
130+
.verify(Duration.ofMillis(100));
131+
132+
Assertions.assertThat(testClientTransport.testConnection().getSent())
133+
.hasSize(2)
134+
.allMatch(
135+
bb -> {
136+
DefaultConnectionSetupPayload payload = new DefaultConnectionSetupPayload(bb);
137+
return payload.getDataUtf8().equals("TestData")
138+
&& payload.getMetadataUtf8().equals("TestMetadata");
139+
})
140+
.allMatch(ReferenceCounted::release);
141+
142+
Assertions.assertThat(saved)
143+
.as("Metadata and data were consumed and released as slices")
144+
.allMatch(
145+
payload ->
146+
payload.refCnt() == 1
147+
&& payload.data().refCnt() == 0
148+
&& payload.metadata().refCnt() == 0);
149+
}
102150
}

0 commit comments

Comments
 (0)