Skip to content

Reject frames less than min MTU size #913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private FireAndForgetResponderSubscriber() {

this.frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
}

@Override
Expand All @@ -92,7 +92,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
final CompositeByteBuf frames = this.frames;

try {
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
ReassemblyUtils.addFollowingFrame(
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
} catch (IllegalStateException t) {
this.requesterResponderSupport.remove(this.streamId, this);

Expand Down
13 changes: 10 additions & 3 deletions rsocket-core/src/main/java/io/rsocket/core/ReassemblyUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
if (frames == null) {
frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), frame, maxInboundPayloadSize);
allocator.compositeBuffer(), frame, hasFollows, maxInboundPayloadSize);
instance.setFrames(frames);

long previousState = markReassembling(updater, instance);
Expand All @@ -113,7 +113,8 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
}
} else {
try {
frames = ReassemblyUtils.addFollowingFrame(frames, frame, maxInboundPayloadSize);
frames =
ReassemblyUtils.addFollowingFrame(frames, frame, hasFollows, maxInboundPayloadSize);
} catch (IllegalStateException t) {
if (isTerminated(updater.get(instance))) {
return;
Expand Down Expand Up @@ -160,7 +161,10 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
}

static CompositeByteBuf addFollowingFrame(
CompositeByteBuf frames, ByteBuf followingFrame, int maxInboundPayloadSize) {
CompositeByteBuf frames,
ByteBuf followingFrame,
boolean hasFollows,
int maxInboundPayloadSize) {
int readableBytes = frames.readableBytes();
if (readableBytes == 0) {
return frames.addComponent(true, followingFrame.retain());
Expand All @@ -169,6 +173,9 @@ static CompositeByteBuf addFollowingFrame(
> maxInboundPayloadSize) {
throw new IllegalStateException(
String.format(ILLEGAL_REASSEMBLED_PAYLOAD_SIZE, maxInboundPayloadSize));
} else if (followingFrame.readableBytes() < MIN_MTU_SIZE - 3 && hasFollows) {
// FIXME: check MIN_MTU_SIZE only (currently fragments have size of 61)
throw new IllegalStateException("Fragment is too small.");
}

final boolean hasMetadata = FrameHeaderCodec.hasMetadata(followingFrame);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public RequestChannelResponderSubscriber(

this.frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
STATE.lazySet(this, REASSEMBLING_FLAG);
}

Expand Down Expand Up @@ -491,7 +491,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
if (frames == null) {
frames =
ReassemblyUtils.addFollowingFrame(
this.allocator.compositeBuffer(), frame, this.maxInboundPayloadSize);
this.allocator.compositeBuffer(), frame, hasFollows, this.maxInboundPayloadSize);
this.frames = frames;

long previousState = markReassembling(STATE, this);
Expand All @@ -502,7 +502,9 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
}
} else {
try {
frames = ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
frames =
ReassemblyUtils.addFollowingFrame(
frames, frame, hasFollows, this.maxInboundPayloadSize);
} catch (IllegalStateException e) {
if (isTerminated(this.state)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RequestResponseResponderSubscriber(
this.handler = handler;
this.frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
}

public RequestResponseResponderSubscriber(
Expand Down Expand Up @@ -218,7 +218,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
}

try {
ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
ReassemblyUtils.addFollowingFrame(frames, frame, hasFollows, this.maxInboundPayloadSize);
} catch (IllegalStateException t) {
S.lazySet(this, Operators.cancelledSubscription());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RequestStreamResponderSubscriber(
this.handler = handler;
this.frames =
ReassemblyUtils.addFollowingFrame(
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
}

public RequestStreamResponderSubscriber(
Expand Down Expand Up @@ -258,7 +258,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
}

try {
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
ReassemblyUtils.addFollowingFrame(
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
} catch (IllegalStateException t) {
// if subscription is null, it means that streams has not yet reassembled all the fragments
// and fragmentation of the first frame was cancelled before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,36 @@ public void errorTooBigPayload(
leaksTrackingByteBufAllocator.assertHasNoLeaks();
}

@ParameterizedTest(name = "throws error if fragment before the last is < min MTU {0}")
@MethodSource("requestNInteractions")
public void errorFragmentTooSmall(
FrameType frameType,
BiFunction<ClientSocketRule, Payload, Publisher<Payload>> requestFunction) {
final int mtu = 32;
final LeaksTrackingByteBufAllocator leaksTrackingByteBufAllocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);

final Payload requestPayload = genericPayload(leaksTrackingByteBufAllocator);
final Payload responsePayload = fixedSizePayload(leaksTrackingByteBufAllocator, 156);
List<ByteBuf> fragments = prepareFragments(leaksTrackingByteBufAllocator, mtu, responsePayload);
responsePayload.release();

StepVerifier.create(requestFunction.apply(rule, requestPayload))
.then(() -> rule.connection.addToReceivedBuffer(fragments.toArray(new ByteBuf[0])))
.expectErrorMessage("Fragment is too small.")
.verify();

FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(frameType).hasNoLeaks();

if (frameType == REQUEST_CHANNEL) {
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(COMPLETE).hasNoLeaks();
}

FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(CANCEL).hasNoLeaks();

leaksTrackingByteBufAllocator.assertHasNoLeaks();
}

public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
@Override
protected RSocketRequester newRSocket() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,55 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
rule.assertHasNoLeaks();
}

@ParameterizedTest(name = "throws error if fragment before the last is < min MTU {0}")
@MethodSource("requestCases")
public void errorFragmentTooSmall(FrameType frameType) {
final int mtu = 32;
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
rule.setAcceptingSocket(
new RSocket() {
@Override
public Mono<Void> fireAndForget(Payload payload) {
receivedPayload.set(payload);
return Mono.empty();
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
receivedPayload.set(payload);
return Mono.just(genericPayload(rule.allocator));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
receivedPayload.set(payload);
return Flux.just(genericPayload(rule.allocator));
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads).subscribe(receivedPayload::set, null, null, s -> s.request(1));
return Flux.just(genericPayload(rule.allocator));
}
});
final Payload randomPayload = fixedSizePayload(rule.allocator, 156);
List<ByteBuf> fragments = prepareFragments(rule.allocator, mtu, randomPayload, frameType);
randomPayload.release();

rule.connection.addToReceivedBuffer(fragments.toArray(new ByteBuf[0]));

PayloadAssert.assertThat(receivedPayload.get()).isNull();

if (frameType != REQUEST_FNF) {
FrameAssert.assertThat(rule.connection.getSent().poll())
.typeOf(ERROR)
.hasData("Failed to reassemble payload. Cause: Fragment is too small.")
.hasNoLeaks();
}

rule.assertHasNoLeaks();
}

@ParameterizedTest
@MethodSource("requestCases")
void receivingRequestOnStreamIdThaIsAlreadyInUseMUSTBeIgnored_ReassemblyCase(
Expand Down