Skip to content

Commit 48c80b0

Browse files
committed
Merge branch '1.0.x'
2 parents 5631f69 + 96ebf27 commit 48c80b0

File tree

2 files changed

+87
-10
lines changed

2 files changed

+87
-10
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@
4444
import java.util.function.BiConsumer;
4545
import java.util.function.Consumer;
4646
import java.util.function.Supplier;
47+
4748
import reactor.core.Disposable;
4849
import reactor.core.publisher.Mono;
4950
import reactor.core.scheduler.Schedulers;
5051
import reactor.util.annotation.Nullable;
52+
import reactor.util.function.Tuples;
5153
import reactor.util.retry.Retry;
5254

5355
/**
@@ -78,7 +80,7 @@ public class RSocketConnector {
7880
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
7981
(r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);
8082

81-
private Payload setupPayload = EmptyPayload.INSTANCE;
83+
private Mono<Payload> setupPayloadMono = Mono.empty();
8284
private String metadataMimeType = "application/binary";
8385
private String dataMimeType = "application/binary";
8486
private Duration keepAliveInterval = Duration.ofSeconds(20);
@@ -128,22 +130,38 @@ public static RSocketClient createRSocketClient(ClientTransport transport) {
128130
}
129131

130132
/**
131-
* Provide a {@code Payload} with data and/or metadata for the initial {@code SETUP} frame. Data
132-
* and metadata should be formatted according to the MIME types specified via {@link
133+
* Provide a {@code Mono} from which to obtain the {@code Payload} for the initial SETUP frame.
134+
* Data and metadata should be formatted according to the MIME types specified via {@link
133135
* #dataMimeType(String)} and {@link #metadataMimeType(String)}.
134136
*
135-
* @param payload the payload containing data and/or metadata for the {@code SETUP} frame. Note,
136-
* if the instance of the given payload is not a {@link DefaultPayload}, its content will be
137-
* copied
137+
* @param setupPayloadMono the payload with data and/or metadata for the {@code SETUP} frame.
138+
* @return the same instance for method chaining
139+
* @since 1.0.2
140+
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#frame-setup">SETUP
141+
* Frame</a>
142+
*/
143+
public RSocketConnector setupPayload(Mono<Payload> setupPayloadMono) {
144+
this.setupPayloadMono = setupPayloadMono;
145+
return this;
146+
}
147+
148+
/**
149+
* Variant of {@link #setupPayload(Mono)} that accepts a {@code Payload} instance.
150+
*
151+
* <p>Note: if the given payload is {@link io.rsocket.util.ByteBufPayload}, it is copied to a
152+
* {@link DefaultPayload} and released immediately. This ensures it can re-used to obtain a
153+
* connection more than once.
154+
*
155+
* @param payload the payload with data and/or metadata for the {@code SETUP} frame.
138156
* @return the same instance for method chaining
139157
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#frame-setup">SETUP
140158
* Frame</a>
141159
*/
142160
public RSocketConnector setupPayload(Payload payload) {
143161
if (payload instanceof DefaultPayload) {
144-
this.setupPayload = payload;
162+
this.setupPayloadMono = Mono.just(payload);
145163
} else {
146-
this.setupPayload = DefaultPayload.create(Objects.requireNonNull(payload));
164+
this.setupPayloadMono = Mono.just(DefaultPayload.create(Objects.requireNonNull(payload)));
147165
payload.release();
148166
}
149167
return this;
@@ -532,8 +550,19 @@ private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
532550
mtu > 0
533551
? new FragmentationDuplexConnection(connection, mtu, "client")
534552
: new ReassemblyDuplexConnection(connection));
535-
return connectionMono.flatMap(
536-
connection -> {
553+
return connectionMono
554+
.flatMap(
555+
connection ->
556+
setupPayloadMono
557+
.defaultIfEmpty(EmptyPayload.INSTANCE)
558+
.map(setupPayload -> Tuples.of(connection, setupPayload))
559+
.doOnError(ex -> connection.dispose())
560+
.doOnCancel(connection::dispose))
561+
.flatMap(
562+
tuple -> {
563+
DuplexConnection connection = tuple.getT1();
564+
Payload setupPayload = tuple.getT2();
565+
537566
ByteBuf resumeToken;
538567
KeepAliveHandler keepAliveHandler;
539568
DuplexConnection wrappedConnection;

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import io.rsocket.test.util.TestClientTransport;
1010
import io.rsocket.util.ByteBufPayload;
1111
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
1214
import org.assertj.core.api.Assertions;
1315
import org.junit.jupiter.api.Test;
1416
import reactor.core.publisher.Mono;
@@ -99,4 +101,50 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
99101
.allMatch(ReferenceCounted::release);
100102
Assertions.assertThat(setupPayload.refCnt()).isZero();
101103
}
104+
105+
@Test
106+
public void ensuresThatSetupPayloadProvidedAsMonoIsReleased() {
107+
List<Payload> saved = new ArrayList<>();
108+
Mono<Payload> setupPayloadMono =
109+
Mono.create(
110+
sink -> {
111+
Payload payload = ByteBufPayload.create("TestData", "TestMetadata");
112+
saved.add(payload);
113+
sink.success(payload);
114+
});
115+
116+
TestClientTransport testClientTransport = new TestClientTransport();
117+
Mono<RSocket> connectionMono =
118+
RSocketConnector.create().setupPayload(setupPayloadMono).connect(testClientTransport);
119+
120+
connectionMono
121+
.as(StepVerifier::create)
122+
.expectNextCount(1)
123+
.expectComplete()
124+
.verify(Duration.ofMillis(100));
125+
126+
connectionMono
127+
.as(StepVerifier::create)
128+
.expectNextCount(1)
129+
.expectComplete()
130+
.verify(Duration.ofMillis(100));
131+
132+
Assertions.assertThat(testClientTransport.testConnection().getSent())
133+
.hasSize(2)
134+
.allMatch(
135+
bb -> {
136+
DefaultConnectionSetupPayload payload = new DefaultConnectionSetupPayload(bb);
137+
return payload.getDataUtf8().equals("TestData")
138+
&& payload.getMetadataUtf8().equals("TestMetadata");
139+
})
140+
.allMatch(ReferenceCounted::release);
141+
142+
Assertions.assertThat(saved)
143+
.as("Metadata and data were consumed and released as slices")
144+
.allMatch(
145+
payload ->
146+
payload.refCnt() == 1
147+
&& payload.data().refCnt() == 0
148+
&& payload.metadata().refCnt() == 0);
149+
}
102150
}

0 commit comments

Comments
 (0)