Skip to content

Commit 80c5b7e

Browse files
committed
DefaultPayload copies ByteBuf content
Closes gh-970 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 0601f88 commit 80c5b7e

File tree

3 files changed

+34
-10
lines changed

3 files changed

+34
-10
lines changed

rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,18 @@ public static Payload create(ByteBuf data) {
9999
}
100100

101101
public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) {
102-
try {
103-
return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer());
104-
} finally {
105-
data.release();
106-
if (metadata != null) {
107-
metadata.release();
108-
}
102+
byte[] dataBytes = new byte[data.readableBytes()];
103+
data.readBytes(dataBytes);
104+
byte[] metadataBytes = null;
105+
if (metadata != null) {
106+
metadataBytes = new byte[metadata.readableBytes()];
107+
metadata.readBytes(metadataBytes);
109108
}
109+
return create(dataBytes, metadataBytes);
110110
}
111111

112112
public static Payload create(Payload payload) {
113-
return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null);
113+
return create(payload.data(), payload.hasMetadata() ? payload.metadata() : null);
114114
}
115115

116116
@Override

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,14 @@ public void ensuresThatSetupPayloadCanBeRetained() {
7373
@Test
7474
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
7575
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");
76-
7776
Assertions.assertThat(setupPayload.refCnt()).isOne();
7877

78+
// Keep the data and metadata around so we can try changing them independently
79+
ByteBuf dataBuf = setupPayload.data();
80+
ByteBuf metadataBuf = setupPayload.metadata();
81+
dataBuf.retain();
82+
metadataBuf.retain();
83+
7984
TestClientTransport testClientTransport = new TestClientTransport();
8085
Mono<RSocket> connectionMono =
8186
RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport);
@@ -92,6 +97,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
9297
.expectComplete()
9398
.verify(Duration.ofMillis(100));
9499

100+
// Changing the original data and metadata should not impact the SetupPayload
101+
dataBuf.writerIndex(dataBuf.readerIndex());
102+
dataBuf.writeChar('d');
103+
dataBuf.release();
104+
105+
metadataBuf.writerIndex(metadataBuf.readerIndex());
106+
metadataBuf.writeChar('m');
107+
metadataBuf.release();
108+
95109
Assertions.assertThat(testClientTransport.testConnection().getSent())
96110
.hasSize(2)
97111
.allMatch(
@@ -100,7 +114,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions
100114
return payload.getDataUtf8().equals("TestData")
101115
&& payload.getMetadataUtf8().equals("TestMetadata");
102116
})
103-
.allMatch(ReferenceCounted::release);
117+
.allMatch(
118+
byteBuf -> {
119+
System.out.println("calling release " + byteBuf.refCnt());
120+
return byteBuf.release();
121+
});
104122
Assertions.assertThat(setupPayload.refCnt()).isZero();
105123
}
106124

rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ public void shouldReleaseGivenByteBufDataAndMetadataUpOnPayloadCreation() {
103103
metadataPresent
104104
? ByteBuffer.wrap(new byte[] {(byte) (i + 1)})
105105
: DefaultPayload.EMPTY_BUFFER);
106+
107+
data.release();
108+
if (metadata != null) {
109+
metadata.release();
110+
}
111+
106112
allocator.assertHasNoLeaks();
107113
}
108114
}

0 commit comments

Comments
 (0)