Skip to content

Commit d01121f

Browse files
nebhalerobertroeser
authored andcommitted
Always Reassemble Regardless of MTU (#503)
Previously, if a user configured a non-positive (<=0) MTU for fragmentation, both fragmentation and reassembly were disabled. Given that a receiver must always be prepared to reassemble fragmented frames (there is no handshake or flag that both parties must agree to) requiring fragmentation to enable reassembly was to strict. This change updates the fragmenter to handle an MTU of size 0 as an indicator that there should be no fragmentation, but still enable reassembly. The RSocketFactory was also updated to ensure that the fragmenter is always added, rather than skipping it when the MTU was zero. [#501]
1 parent db47113 commit d01121f

File tree

7 files changed

+82
-12
lines changed

7 files changed

+82
-12
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,7 @@ public Mono<RSocket> start() {
211211
dataMimeType,
212212
setupPayload);
213213

214-
if (mtu > 0) {
215-
connection = new FragmentationDuplexConnection(connection, mtu);
216-
}
214+
connection = new FragmentationDuplexConnection(connection, mtu);
217215

218216
ClientServerInputMultiplexer multiplexer =
219217
new ClientServerInputMultiplexer(connection, plugins);

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentS
6868
*
6969
* @param byteBufAllocator the {@link ByteBufAllocator} to use
7070
* @param delegate the {@link DuplexConnection} to decorate
71-
* @param maxFragmentSize the maximum fragment size
71+
* @param maxFragmentSize the maximum fragment size. A value of 0 indicates that frames should not be fragmented.
7272
* @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null}
7373
* @throws IllegalArgumentException if {@code maxFragmentSize} is not {@code positive}
7474
*/
@@ -79,7 +79,7 @@ public FragmentationDuplexConnection(
7979
Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
8080
this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");
8181

82-
NumberUtils.requirePositive(maxFragmentSize, "maxFragmentSize must be positive");
82+
NumberUtils.requireNonNegative(maxFragmentSize, "maxFragmentSize must be positive");
8383

8484
this.frameFragmenter = new FrameFragmenter(byteBufAllocator, maxFragmentSize);
8585
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private int getFragmentableLength(FragmentableFrame fragmentableFrame) {
124124
}
125125

126126
private boolean shouldFragment(Frame frame) {
127-
if (!(frame instanceof FragmentableFrame)) {
127+
if (maxFragmentSize == 0 || !(frame instanceof FragmentableFrame)) {
128128
return false;
129129
}
130130

rsocket-core/src/main/java/io/rsocket/util/NumberUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,25 @@ public final class NumberUtils {
3737

3838
private NumberUtils() {}
3939

40+
/**
41+
* Requires that an {@code int} is greater than or equal to zero.
42+
*
43+
* @param i the {@code int} to test
44+
* @param message detail message to be used in the event that a {@link IllegalArgumentException}
45+
* is thrown
46+
* @return the {@code int} if greater than or equal to zero
47+
* @throws IllegalArgumentException if {@code i} is less than zero
48+
*/
49+
public static int requireNonNegative(int i, String message) {
50+
Objects.requireNonNull(message, "message must not be null");
51+
52+
if (i < 0) {
53+
throw new IllegalArgumentException(message);
54+
}
55+
56+
return i;
57+
}
58+
4059
/**
4160
* Requires that a {@code long} is greater than zero.
4261
*

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ final class FragmentationDuplexConnectionTest {
4747
private final ArgumentCaptor<Publisher<Frame>> publishers =
4848
ArgumentCaptor.forClass(Publisher.class);
4949

50-
@DisplayName("constructor throws NullPointerException with invalid maxFragmentLength")
50+
@DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength")
5151
@Test
5252
void constructorInvalidMaxFragmentSize() {
5353
assertThatIllegalArgumentException()
54-
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 0))
54+
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, Integer.MIN_VALUE))
5555
.withMessage("maxFragmentSize must be positive");
5656
}
5757

@@ -366,4 +366,17 @@ void sendNullFrames() {
366366
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 2).send(null))
367367
.withMessage("frames must not be null");
368368
}
369+
370+
@DisplayName("does not fragment with zero maxFragmentLength")
371+
@Test
372+
void sendZeroMaxFragmentLength() {
373+
Frame frame =
374+
toAbstractionLeakingFrame(
375+
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));
376+
377+
new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame);
378+
verify(delegate).send(publishers.capture());
379+
380+
StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
381+
}
369382
}

rsocket-core/src/test/java/io/rsocket/fragmentation/FrameFragmenterTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,4 +173,16 @@ void fragmentWithNullFrame() {
173173
.isThrownBy(() -> new FrameFragmenter(DEFAULT, 2).fragment(null))
174174
.withMessage("frame must not be null");
175175
}
176+
177+
@DisplayName("does not fragment with zero maxFragmentLength")
178+
@Test
179+
void fragmentZeroMaxFragmentLength() {
180+
PayloadFrame frame = createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2));
181+
182+
new FrameFragmenter(DEFAULT, 0)
183+
.fragment(frame)
184+
.as(StepVerifier::create)
185+
.expectNext(frame)
186+
.verifyComplete();
187+
}
176188
}

rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,37 @@
2525

2626
final class NumberUtilsTest {
2727

28-
@DisplayName("returns long value with positive int")
28+
@DisplayName("returns int value with postitive int")
29+
@Test
30+
void requireNonNegativeInt() {
31+
assertThat(NumberUtils.requireNonNegative(Integer.MAX_VALUE, "test-message"))
32+
.isEqualTo(Integer.MAX_VALUE);
33+
}
34+
35+
@DisplayName(
36+
"requireNonNegative with int argument throws IllegalArgumentException with negative value")
37+
@Test
38+
void requireNonNegativeIntNegative() {
39+
assertThatIllegalArgumentException()
40+
.isThrownBy(() -> NumberUtils.requireNonNegative(Integer.MIN_VALUE, "test-message"))
41+
.withMessage("test-message");
42+
}
43+
44+
@DisplayName("requireNonNegative with int argument throws NullPointerException with null message")
45+
@Test
46+
void requireNonNegativeIntNullMessage() {
47+
assertThatNullPointerException()
48+
.isThrownBy(() -> NumberUtils.requireNonNegative(Integer.MIN_VALUE, null))
49+
.withMessage("message must not be null");
50+
}
51+
52+
@DisplayName("requireNonNegative returns int value with zero")
53+
@Test
54+
void requireNonNegativeIntZero() {
55+
assertThat(NumberUtils.requireNonNegative(0, "test-message")).isEqualTo(0);
56+
}
57+
58+
@DisplayName("requirePositive returns int value with positive int")
2959
@Test
3060
void requirePositiveInt() {
3161
assertThat(NumberUtils.requirePositive(Integer.MAX_VALUE, "test-message"))
@@ -52,13 +82,12 @@ void requirePositiveIntNullMessage() {
5282
@DisplayName("requirePositive with int argument throws IllegalArgumentException with zero value")
5383
@Test
5484
void requirePositiveIntZero() {
55-
5685
assertThatIllegalArgumentException()
5786
.isThrownBy(() -> NumberUtils.requirePositive(0, "test-message"))
5887
.withMessage("test-message");
5988
}
6089

61-
@DisplayName("returns long value with positive long")
90+
@DisplayName("requirePositive returns long value with positive long")
6291
@Test
6392
void requirePositiveLong() {
6493
assertThat(NumberUtils.requirePositive(Long.MAX_VALUE, "test-message"))
@@ -85,7 +114,6 @@ void requirePositiveLongNullMessage() {
85114
@DisplayName("requirePositive with long argument throws IllegalArgumentException with zero value")
86115
@Test
87116
void requirePositiveLongZero() {
88-
89117
assertThatIllegalArgumentException()
90118
.isThrownBy(() -> NumberUtils.requirePositive(0L, "test-message"))
91119
.withMessage("test-message");

0 commit comments

Comments
 (0)