Skip to content

Commit 021e17d

Browse files
authored
fixes ConnectionSetupPayload refcnt management (#854)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2fa2120 commit 021e17d

File tree

2 files changed

+56
-2
lines changed

2 files changed

+56
-2
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,9 +627,10 @@ private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
627627
mtu);
628628

629629
return wrappedConnection
630-
.sendOne(setupFrame)
630+
.sendOne(setupFrame.retain())
631631
.thenReturn(wrappedRSocketRequester);
632-
});
632+
})
633+
.doFinally(signalType -> setup.release());
633634
});
634635
}
635636
}

rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.rsocket.core;
22

3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.util.CharsetUtil;
35
import io.netty.util.ReferenceCounted;
6+
import io.rsocket.ConnectionSetupPayload;
47
import io.rsocket.Payload;
58
import io.rsocket.RSocket;
69
import io.rsocket.test.util.TestClientTransport;
@@ -9,10 +12,60 @@
912
import org.assertj.core.api.Assertions;
1013
import org.junit.jupiter.api.Test;
1114
import reactor.core.publisher.Mono;
15+
import reactor.core.publisher.MonoProcessor;
1216
import reactor.test.StepVerifier;
1317

1418
public class RSocketConnectorTest {
1519

20+
@Test
21+
public void ensuresThatSetupPayloadCanBeRetained() {
22+
MonoProcessor<ConnectionSetupPayload> retainedSetupPayload = MonoProcessor.create();
23+
TestClientTransport transport = new TestClientTransport();
24+
25+
ByteBuf data = transport.alloc().buffer();
26+
27+
data.writeCharSequence("data", CharsetUtil.UTF_8);
28+
29+
RSocketConnector.create()
30+
.setupPayload(ByteBufPayload.create(data))
31+
.acceptor(
32+
(setup, sendingSocket) -> {
33+
retainedSetupPayload.onNext(setup.retain());
34+
return Mono.just(new RSocket() {});
35+
})
36+
.connect(transport)
37+
.block();
38+
39+
Assertions.assertThat(transport.testConnection().getSent())
40+
.hasSize(1)
41+
.first()
42+
.matches(
43+
bb -> {
44+
DefaultConnectionSetupPayload payload = new DefaultConnectionSetupPayload(bb);
45+
return !payload.hasMetadata() && payload.getDataUtf8().equals("data");
46+
})
47+
.matches(buf -> buf.refCnt() == 2)
48+
.matches(
49+
buf -> {
50+
buf.release();
51+
return buf.refCnt() == 1;
52+
});
53+
54+
retainedSetupPayload
55+
.as(StepVerifier::create)
56+
.expectNextMatches(
57+
setup -> {
58+
String dataUtf8 = setup.getDataUtf8();
59+
return "data".equals(dataUtf8) && setup.release();
60+
})
61+
.expectComplete()
62+
.verify(Duration.ofSeconds(5));
63+
64+
Assertions.assertThat(retainedSetupPayload.peek().refCnt()).isZero();
65+
66+
transport.alloc().assertHasNoLeaks();
67+
}
68+
1669
@Test
1770
public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() {
1871
Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata");

0 commit comments

Comments
 (0)