Skip to content

Commit 2c873d6

Browse files
authored
Reject frames less than min MTU size (#913)
This is the proposed fix for #895 for the 1.1 codebase where the logic for re-assembling is in a different class than 1.0. Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 54cf162 commit 2c873d6

File tree

7 files changed

+102
-12
lines changed

7 files changed

+102
-12
lines changed

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private FireAndForgetResponderSubscriber() {
6868

6969
this.frames =
7070
ReassemblyUtils.addFollowingFrame(
71-
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
71+
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
7272
}
7373

7474
@Override
@@ -92,7 +92,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
9292
final CompositeByteBuf frames = this.frames;
9393

9494
try {
95-
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
95+
ReassemblyUtils.addFollowingFrame(
96+
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
9697
} catch (IllegalStateException t) {
9798
this.requesterResponderSupport.remove(this.streamId, this);
9899

rsocket-core/src/main/java/io/rsocket/core/ReassemblyUtils.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
102102
if (frames == null) {
103103
frames =
104104
ReassemblyUtils.addFollowingFrame(
105-
allocator.compositeBuffer(), frame, maxInboundPayloadSize);
105+
allocator.compositeBuffer(), frame, hasFollows, maxInboundPayloadSize);
106106
instance.setFrames(frames);
107107

108108
long previousState = markReassembling(updater, instance);
@@ -113,7 +113,8 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
113113
}
114114
} else {
115115
try {
116-
frames = ReassemblyUtils.addFollowingFrame(frames, frame, maxInboundPayloadSize);
116+
frames =
117+
ReassemblyUtils.addFollowingFrame(frames, frame, hasFollows, maxInboundPayloadSize);
117118
} catch (IllegalStateException t) {
118119
if (isTerminated(updater.get(instance))) {
119120
return;
@@ -160,7 +161,10 @@ static <T extends RequesterFrameHandler> void handleNextSupport(
160161
}
161162

162163
static CompositeByteBuf addFollowingFrame(
163-
CompositeByteBuf frames, ByteBuf followingFrame, int maxInboundPayloadSize) {
164+
CompositeByteBuf frames,
165+
ByteBuf followingFrame,
166+
boolean hasFollows,
167+
int maxInboundPayloadSize) {
164168
int readableBytes = frames.readableBytes();
165169
if (readableBytes == 0) {
166170
return frames.addComponent(true, followingFrame.retain());
@@ -169,6 +173,9 @@ static CompositeByteBuf addFollowingFrame(
169173
> maxInboundPayloadSize) {
170174
throw new IllegalStateException(
171175
String.format(ILLEGAL_REASSEMBLED_PAYLOAD_SIZE, maxInboundPayloadSize));
176+
} else if (followingFrame.readableBytes() < MIN_MTU_SIZE - 3 && hasFollows) {
177+
// FIXME: check MIN_MTU_SIZE only (currently fragments have size of 61)
178+
throw new IllegalStateException("Fragment is too small.");
172179
}
173180

174181
final boolean hasMetadata = FrameHeaderCodec.hasMetadata(followingFrame);

rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public RequestChannelResponderSubscriber(
104104

105105
this.frames =
106106
ReassemblyUtils.addFollowingFrame(
107-
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
107+
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
108108
STATE.lazySet(this, REASSEMBLING_FLAG);
109109
}
110110

@@ -491,7 +491,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
491491
if (frames == null) {
492492
frames =
493493
ReassemblyUtils.addFollowingFrame(
494-
this.allocator.compositeBuffer(), frame, this.maxInboundPayloadSize);
494+
this.allocator.compositeBuffer(), frame, hasFollows, this.maxInboundPayloadSize);
495495
this.frames = frames;
496496

497497
long previousState = markReassembling(STATE, this);
@@ -502,7 +502,9 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
502502
}
503503
} else {
504504
try {
505-
frames = ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
505+
frames =
506+
ReassemblyUtils.addFollowingFrame(
507+
frames, frame, hasFollows, this.maxInboundPayloadSize);
506508
} catch (IllegalStateException e) {
507509
if (isTerminated(this.state)) {
508510
return;

rsocket-core/src/main/java/io/rsocket/core/RequestResponseResponderSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public RequestResponseResponderSubscriber(
8181
this.handler = handler;
8282
this.frames =
8383
ReassemblyUtils.addFollowingFrame(
84-
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
84+
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
8585
}
8686

8787
public RequestResponseResponderSubscriber(
@@ -218,7 +218,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
218218
}
219219

220220
try {
221-
ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
221+
ReassemblyUtils.addFollowingFrame(frames, frame, hasFollows, this.maxInboundPayloadSize);
222222
} catch (IllegalStateException t) {
223223
S.lazySet(this, Operators.cancelledSubscription());
224224

rsocket-core/src/main/java/io/rsocket/core/RequestStreamResponderSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public RequestStreamResponderSubscriber(
8484
this.handler = handler;
8585
this.frames =
8686
ReassemblyUtils.addFollowingFrame(
87-
allocator.compositeBuffer(), firstFrame, maxInboundPayloadSize);
87+
allocator.compositeBuffer(), firstFrame, true, maxInboundPayloadSize);
8888
}
8989

9090
public RequestStreamResponderSubscriber(
@@ -258,7 +258,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
258258
}
259259

260260
try {
261-
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
261+
ReassemblyUtils.addFollowingFrame(
262+
frames, followingFrame, hasFollows, this.maxInboundPayloadSize);
262263
} catch (IllegalStateException t) {
263264
// if subscription is null, it means that streams has not yet reassembled all the fragments
264265
// and fragmentation of the first frame was cancelled before

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,6 +1332,36 @@ public void errorTooBigPayload(
13321332
leaksTrackingByteBufAllocator.assertHasNoLeaks();
13331333
}
13341334

1335+
@ParameterizedTest(name = "throws error if fragment before the last is < min MTU {0}")
1336+
@MethodSource("requestNInteractions")
1337+
public void errorFragmentTooSmall(
1338+
FrameType frameType,
1339+
BiFunction<ClientSocketRule, Payload, Publisher<Payload>> requestFunction) {
1340+
final int mtu = 32;
1341+
final LeaksTrackingByteBufAllocator leaksTrackingByteBufAllocator =
1342+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
1343+
1344+
final Payload requestPayload = genericPayload(leaksTrackingByteBufAllocator);
1345+
final Payload responsePayload = fixedSizePayload(leaksTrackingByteBufAllocator, 156);
1346+
List<ByteBuf> fragments = prepareFragments(leaksTrackingByteBufAllocator, mtu, responsePayload);
1347+
responsePayload.release();
1348+
1349+
StepVerifier.create(requestFunction.apply(rule, requestPayload))
1350+
.then(() -> rule.connection.addToReceivedBuffer(fragments.toArray(new ByteBuf[0])))
1351+
.expectErrorMessage("Fragment is too small.")
1352+
.verify();
1353+
1354+
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(frameType).hasNoLeaks();
1355+
1356+
if (frameType == REQUEST_CHANNEL) {
1357+
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(COMPLETE).hasNoLeaks();
1358+
}
1359+
1360+
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(CANCEL).hasNoLeaks();
1361+
1362+
leaksTrackingByteBufAllocator.assertHasNoLeaks();
1363+
}
1364+
13351365
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
13361366
@Override
13371367
protected RSocketRequester newRSocket() {

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,6 +1002,55 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
10021002
rule.assertHasNoLeaks();
10031003
}
10041004

1005+
@ParameterizedTest(name = "throws error if fragment before the last is < min MTU {0}")
1006+
@MethodSource("requestCases")
1007+
public void errorFragmentTooSmall(FrameType frameType) {
1008+
final int mtu = 32;
1009+
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
1010+
rule.setAcceptingSocket(
1011+
new RSocket() {
1012+
@Override
1013+
public Mono<Void> fireAndForget(Payload payload) {
1014+
receivedPayload.set(payload);
1015+
return Mono.empty();
1016+
}
1017+
1018+
@Override
1019+
public Mono<Payload> requestResponse(Payload payload) {
1020+
receivedPayload.set(payload);
1021+
return Mono.just(genericPayload(rule.allocator));
1022+
}
1023+
1024+
@Override
1025+
public Flux<Payload> requestStream(Payload payload) {
1026+
receivedPayload.set(payload);
1027+
return Flux.just(genericPayload(rule.allocator));
1028+
}
1029+
1030+
@Override
1031+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
1032+
Flux.from(payloads).subscribe(receivedPayload::set, null, null, s -> s.request(1));
1033+
return Flux.just(genericPayload(rule.allocator));
1034+
}
1035+
});
1036+
final Payload randomPayload = fixedSizePayload(rule.allocator, 156);
1037+
List<ByteBuf> fragments = prepareFragments(rule.allocator, mtu, randomPayload, frameType);
1038+
randomPayload.release();
1039+
1040+
rule.connection.addToReceivedBuffer(fragments.toArray(new ByteBuf[0]));
1041+
1042+
PayloadAssert.assertThat(receivedPayload.get()).isNull();
1043+
1044+
if (frameType != REQUEST_FNF) {
1045+
FrameAssert.assertThat(rule.connection.getSent().poll())
1046+
.typeOf(ERROR)
1047+
.hasData("Failed to reassemble payload. Cause: Fragment is too small.")
1048+
.hasNoLeaks();
1049+
}
1050+
1051+
rule.assertHasNoLeaks();
1052+
}
1053+
10051054
@ParameterizedTest
10061055
@MethodSource("requestCases")
10071056
void receivingRequestOnStreamIdThaIsAlreadyInUseMUSTBeIgnored_ReassemblyCase(

0 commit comments

Comments
 (0)