Skip to content

Commit 862186e

Browse files
authored
bugfix RSocketConnector#setupPayload to copy the content (#843)
1 parent ff4d516 commit 862186e

File tree

2 files changed

+59
-2
lines changed

2 files changed

+59
-2
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.rsocket.plugins.InterceptorRegistry;
3737
import io.rsocket.resume.ClientRSocketSession;
3838
import io.rsocket.transport.ClientTransport;
39+
import io.rsocket.util.DefaultPayload;
3940
import io.rsocket.util.EmptyPayload;
4041
import java.time.Duration;
4142
import java.util.Objects;
@@ -121,13 +122,20 @@ public static Mono<RSocket> connectWith(ClientTransport transport) {
121122
* and metadata should be formatted according to the MIME types specified via {@link
122123
* #dataMimeType(String)} and {@link #metadataMimeType(String)}.
123124
*
124-
* @param payload the payload containing data and/or metadata for the {@code SETUP} frame
125+
* @param payload the payload containing data and/or metadata for the {@code SETUP} frame. Note,
126+
* if the instance of the given payload is not a {@link DefaultPayload}, its content will be
127+
* copied
125128
* @return the same instance for method chaining
126129
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#frame-setup">SETUP
127130
* Frame</a>
128131
*/
129132
public RSocketConnector setupPayload(Payload payload) {
130-
this.setupPayload = Objects.requireNonNull(payload);
133+
if (payload instanceof DefaultPayload) {
134+
this.setupPayload = payload;
135+
} else {
136+
this.setupPayload = DefaultPayload.create(Objects.requireNonNull(payload));
137+
payload.release();
138+
}
131139
return this;
132140
}
133141

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.rsocket.core;
2+
3+
import io.netty.util.ReferenceCounted;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.test.util.TestClientTransport;
7+
import io.rsocket.util.ByteBufPayload;
8+
import java.time.Duration;
9+
import org.assertj.core.api.Assertions;
10+
import org.junit.jupiter.api.Test;
11+
import reactor.core.publisher.Mono;
12+
import reactor.test.StepVerifier;
13+
14+
public class RSocketConnectorTest {
15+
16+
@Test
17+
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
18+
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");
19+
20+
Assertions.assertThat(setupPayload.refCnt()).isOne();
21+
22+
TestClientTransport testClientTransport = new TestClientTransport();
23+
Mono<RSocket> connectionMono =
24+
RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport);
25+
26+
connectionMono
27+
.as(StepVerifier::create)
28+
.expectNextCount(1)
29+
.expectComplete()
30+
.verify(Duration.ofMillis(100));
31+
32+
connectionMono
33+
.as(StepVerifier::create)
34+
.expectNextCount(1)
35+
.expectComplete()
36+
.verify(Duration.ofMillis(100));
37+
38+
Assertions.assertThat(testClientTransport.testConnection().getSent())
39+
.hasSize(2)
40+
.allMatch(
41+
bb -> {
42+
DefaultConnectionSetupPayload payload = new DefaultConnectionSetupPayload(bb);
43+
return payload.getDataUtf8().equals("TestData")
44+
&& payload.getMetadataUtf8().equals("TestMetadata");
45+
})
46+
.allMatch(ReferenceCounted::release);
47+
Assertions.assertThat(setupPayload.refCnt()).isZero();
48+
}
49+
}

0 commit comments

Comments
 (0)