Skip to content

Commit 3e89238

Browse files
committed
Fix multiple calls to dispose in FragmentationDuplexConnection, allow fragmentation to be disabled until leaks are fixed
1 parent 83335d1 commit 3e89238

File tree

3 files changed

+33
-4
lines changed

3 files changed

+33
-4
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,9 @@ public Mono<RSocket> start() {
211211
dataMimeType,
212212
setupPayload);
213213

214-
connection = new FragmentationDuplexConnection(connection, mtu);
214+
if (mtu > 0) {
215+
connection = new FragmentationDuplexConnection(connection, mtu);
216+
}
215217

216218
ClientServerInputMultiplexer multiplexer =
217219
new ClientServerInputMultiplexer(connection, plugins);

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ public FragmentationDuplexConnection(
8383
NumberUtils.requireNonNegative(maxFragmentSize, "maxFragmentSize must be positive");
8484

8585
this.frameFragmenter = new FrameFragmenter(byteBufAllocator, maxFragmentSize);
86+
87+
delegate
88+
.onClose()
89+
.doFinally(signalType -> frameReassemblers.values().forEach(FrameReassembler::dispose))
90+
.subscribe();
8691
}
8792

8893
@Override
@@ -102,9 +107,7 @@ public boolean isDisposed() {
102107

103108
@Override
104109
public Mono<Void> onClose() {
105-
return delegate
106-
.onClose()
107-
.doAfterTerminate(() -> frameReassemblers.values().forEach(FrameReassembler::dispose));
110+
return delegate.onClose();
108111
}
109112

110113
@Override

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.mockito.ArgumentCaptor;
3838
import org.reactivestreams.Publisher;
3939
import reactor.core.publisher.Flux;
40+
import reactor.core.publisher.Mono;
4041
import reactor.test.StepVerifier;
4142

4243
final class FragmentationDuplexConnectionTest {
@@ -93,6 +94,7 @@ void reassembleData() {
9394
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(4, 2)));
9495

9596
when(delegate.receive()).thenReturn(Flux.just(fragment1, fragment2, fragment3));
97+
when(delegate.onClose()).thenReturn(Mono.never());
9698

9799
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
98100
.receive()
@@ -123,6 +125,7 @@ void reassembleMetadata() {
123125
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, metadata.slice(4, 2), null));
124126

125127
when(delegate.receive()).thenReturn(Flux.just(fragment1, fragment2, fragment3));
128+
when(delegate.onClose()).thenReturn(Mono.never());
126129

127130
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
128131
.receive()
@@ -165,6 +168,7 @@ void reassembleMetadataAndData() {
165168

166169
when(delegate.receive())
167170
.thenReturn(Flux.just(fragment1, fragment2, fragment3, fragment4, fragment5));
171+
when(delegate.onClose()).thenReturn(Mono.never());
168172

169173
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
170174
.receive()
@@ -181,6 +185,7 @@ void reassembleNonFragment() {
181185
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, (ByteBuf) null, null));
182186

183187
when(delegate.receive()).thenReturn(Flux.just(frame));
188+
when(delegate.onClose()).thenReturn(Mono.never());
184189

185190
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
186191
.receive()
@@ -195,6 +200,7 @@ void reassembleNonFragmentableFrame() {
195200
Frame frame = toAbstractionLeakingFrame(DEFAULT, 1, createTestCancelFrame());
196201

197202
when(delegate.receive()).thenReturn(Flux.just(frame));
203+
when(delegate.onClose()).thenReturn(Mono.never());
198204

199205
new FragmentationDuplexConnection(DEFAULT, delegate, 2)
200206
.receive()
@@ -224,6 +230,8 @@ void sendData() {
224230
toAbstractionLeakingFrame(
225231
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(4, 2)));
226232

233+
when(delegate.onClose()).thenReturn(Mono.never());
234+
227235
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
228236
verify(delegate).send(publishers.capture());
229237

@@ -241,6 +249,8 @@ void sendEqualToMaxFragmentLength() {
241249
toAbstractionLeakingFrame(
242250
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));
243251

252+
when(delegate.onClose()).thenReturn(Mono.never());
253+
244254
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
245255
verify(delegate).send(publishers.capture());
246256

@@ -254,6 +264,8 @@ void sendFragment() {
254264
toAbstractionLeakingFrame(
255265
DEFAULT, 1, createPayloadFrame(DEFAULT, true, true, (ByteBuf) null, null));
256266

267+
when(delegate.onClose()).thenReturn(Mono.never());
268+
257269
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
258270
verify(delegate).send(publishers.capture());
259271

@@ -267,6 +279,8 @@ void sendLessThanMaxFragmentLength() {
267279
toAbstractionLeakingFrame(
268280
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(1)));
269281

282+
when(delegate.onClose()).thenReturn(Mono.never());
283+
270284
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
271285
verify(delegate).send(publishers.capture());
272286

@@ -294,6 +308,8 @@ void sendMetadata() {
294308
toAbstractionLeakingFrame(
295309
DEFAULT, 1, createPayloadFrame(DEFAULT, false, true, metadata.slice(4, 2), null));
296310

311+
when(delegate.onClose()).thenReturn(Mono.never());
312+
297313
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
298314
verify(delegate).send(publishers.capture());
299315

@@ -336,6 +352,8 @@ void sendMetadataAndData() {
336352
toAbstractionLeakingFrame(
337353
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, data.slice(3, 2)));
338354

355+
when(delegate.onClose()).thenReturn(Mono.never());
356+
339357
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
340358
verify(delegate).send(publishers.capture());
341359

@@ -353,6 +371,8 @@ void sendMetadataAndData() {
353371
void sendNonFragmentable() {
354372
Frame frame = toAbstractionLeakingFrame(DEFAULT, 1, createTestCancelFrame());
355373

374+
when(delegate.onClose()).thenReturn(Mono.never());
375+
356376
new FragmentationDuplexConnection(DEFAULT, delegate, 2).sendOne(frame);
357377
verify(delegate).send(publishers.capture());
358378

@@ -362,6 +382,8 @@ void sendNonFragmentable() {
362382
@DisplayName("send throws NullPointerException with null frames")
363383
@Test
364384
void sendNullFrames() {
385+
when(delegate.onClose()).thenReturn(Mono.never());
386+
365387
assertThatNullPointerException()
366388
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 2).send(null))
367389
.withMessage("frames must not be null");
@@ -374,6 +396,8 @@ void sendZeroMaxFragmentLength() {
374396
toAbstractionLeakingFrame(
375397
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));
376398

399+
when(delegate.onClose()).thenReturn(Mono.never());
400+
377401
new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame);
378402
verify(delegate).send(publishers.capture());
379403

0 commit comments

Comments
 (0)