Skip to content

Always Reassemble Regardless of MTU #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@ public Mono<RSocket> start() {
dataMimeType,
setupPayload);

if (mtu > 0) {
connection = new FragmentationDuplexConnection(connection, mtu);
}
connection = new FragmentationDuplexConnection(connection, mtu);

ClientServerInputMultiplexer multiplexer =
new ClientServerInputMultiplexer(connection, plugins);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public FragmentationDuplexConnection(DuplexConnection delegate, int maxFragmentS
*
* @param byteBufAllocator the {@link ByteBufAllocator} to use
* @param delegate the {@link DuplexConnection} to decorate
* @param maxFragmentSize the maximum fragment size
* @param maxFragmentSize the maximum fragment size. A value of 0 indicates that frames should not be fragmented.
* @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null}
* @throws IllegalArgumentException if {@code maxFragmentSize} is not {@code positive}
*/
Expand All @@ -79,7 +79,7 @@ public FragmentationDuplexConnection(
Objects.requireNonNull(byteBufAllocator, "byteBufAllocator must not be null");
this.delegate = Objects.requireNonNull(delegate, "delegate must not be null");

NumberUtils.requirePositive(maxFragmentSize, "maxFragmentSize must be positive");
NumberUtils.requireNonNegative(maxFragmentSize, "maxFragmentSize must be positive");

this.frameFragmenter = new FrameFragmenter(byteBufAllocator, maxFragmentSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private int getFragmentableLength(FragmentableFrame fragmentableFrame) {
}

private boolean shouldFragment(Frame frame) {
if (!(frame instanceof FragmentableFrame)) {
if (maxFragmentSize == 0 || !(frame instanceof FragmentableFrame)) {
return false;
}

Expand Down
19 changes: 19 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/util/NumberUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ public final class NumberUtils {

private NumberUtils() {}

/**
* Requires that an {@code int} is greater than or equal to zero.
*
* @param i the {@code int} to test
* @param message detail message to be used in the event that a {@link IllegalArgumentException}
* is thrown
* @return the {@code int} if greater than or equal to zero
* @throws IllegalArgumentException if {@code i} is less than zero
*/
public static int requireNonNegative(int i, String message) {
Objects.requireNonNull(message, "message must not be null");

if (i < 0) {
throw new IllegalArgumentException(message);
}

return i;
}

/**
* Requires that a {@code long} is greater than zero.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ final class FragmentationDuplexConnectionTest {
private final ArgumentCaptor<Publisher<Frame>> publishers =
ArgumentCaptor.forClass(Publisher.class);

@DisplayName("constructor throws NullPointerException with invalid maxFragmentLength")
@DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength")
@Test
void constructorInvalidMaxFragmentSize() {
assertThatIllegalArgumentException()
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 0))
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, Integer.MIN_VALUE))
.withMessage("maxFragmentSize must be positive");
}

Expand Down Expand Up @@ -366,4 +366,17 @@ void sendNullFrames() {
.isThrownBy(() -> new FragmentationDuplexConnection(DEFAULT, delegate, 2).send(null))
.withMessage("frames must not be null");
}

@DisplayName("does not fragment with zero maxFragmentLength")
@Test
void sendZeroMaxFragmentLength() {
Frame frame =
toAbstractionLeakingFrame(
DEFAULT, 1, createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2)));

new FragmentationDuplexConnection(DEFAULT, delegate, 0).sendOne(frame);
verify(delegate).send(publishers.capture());

StepVerifier.create(Flux.from(publishers.getValue())).expectNext(frame).verifyComplete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,16 @@ void fragmentWithNullFrame() {
.isThrownBy(() -> new FrameFragmenter(DEFAULT, 2).fragment(null))
.withMessage("frame must not be null");
}

@DisplayName("does not fragment with zero maxFragmentLength")
@Test
void fragmentZeroMaxFragmentLength() {
PayloadFrame frame = createPayloadFrame(DEFAULT, false, false, null, getRandomByteBuf(2));

new FrameFragmenter(DEFAULT, 0)
.fragment(frame)
.as(StepVerifier::create)
.expectNext(frame)
.verifyComplete();
}
}
36 changes: 32 additions & 4 deletions rsocket-core/src/test/java/io/rsocket/util/NumberUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,37 @@

final class NumberUtilsTest {

@DisplayName("returns long value with positive int")
@DisplayName("returns int value with postitive int")
@Test
void requireNonNegativeInt() {
assertThat(NumberUtils.requireNonNegative(Integer.MAX_VALUE, "test-message"))
.isEqualTo(Integer.MAX_VALUE);
}

@DisplayName(
"requireNonNegative with int argument throws IllegalArgumentException with negative value")
@Test
void requireNonNegativeIntNegative() {
assertThatIllegalArgumentException()
.isThrownBy(() -> NumberUtils.requireNonNegative(Integer.MIN_VALUE, "test-message"))
.withMessage("test-message");
}

@DisplayName("requireNonNegative with int argument throws NullPointerException with null message")
@Test
void requireNonNegativeIntNullMessage() {
assertThatNullPointerException()
.isThrownBy(() -> NumberUtils.requireNonNegative(Integer.MIN_VALUE, null))
.withMessage("message must not be null");
}

@DisplayName("requireNonNegative returns int value with zero")
@Test
void requireNonNegativeIntZero() {
assertThat(NumberUtils.requireNonNegative(0, "test-message")).isEqualTo(0);
}

@DisplayName("requirePositive returns int value with positive int")
@Test
void requirePositiveInt() {
assertThat(NumberUtils.requirePositive(Integer.MAX_VALUE, "test-message"))
Expand All @@ -52,13 +82,12 @@ void requirePositiveIntNullMessage() {
@DisplayName("requirePositive with int argument throws IllegalArgumentException with zero value")
@Test
void requirePositiveIntZero() {

assertThatIllegalArgumentException()
.isThrownBy(() -> NumberUtils.requirePositive(0, "test-message"))
.withMessage("test-message");
}

@DisplayName("returns long value with positive long")
@DisplayName("requirePositive returns long value with positive long")
@Test
void requirePositiveLong() {
assertThat(NumberUtils.requirePositive(Long.MAX_VALUE, "test-message"))
Expand All @@ -85,7 +114,6 @@ void requirePositiveLongNullMessage() {
@DisplayName("requirePositive with long argument throws IllegalArgumentException with zero value")
@Test
void requirePositiveLongZero() {

assertThatIllegalArgumentException()
.isThrownBy(() -> NumberUtils.requirePositive(0L, "test-message"))
.withMessage("test-message");
Expand Down