Skip to content

Commit 53a09fb

Browse files
committed
provides payload length validation in case fragmentation is disabled
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent a8500f3 commit 53a09fb

File tree

4 files changed

+73
-40
lines changed

4 files changed

+73
-40
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,13 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
336336
(s, flux) -> {
337337
Payload payload = s.get();
338338
if (payload != null) {
339-
if (!FragmentationUtils.isValid(mtu, payload)) {
340-
payload.release();
341-
final IllegalArgumentException t = new IllegalArgumentException("Too big Payload size");
342-
errorConsumer.accept(t);
343-
return Mono.error(t);
344-
}
339+
if (!FragmentationUtils.isValid(mtu, payload)) {
340+
payload.release();
341+
final IllegalArgumentException t =
342+
new IllegalArgumentException("Too big Payload size");
343+
errorConsumer.accept(t);
344+
return Mono.error(t);
345+
}
345346
return handleChannel(payload, flux.skip(1));
346347
} else {
347348
return flux;
@@ -366,16 +367,17 @@ protected void hookOnSubscribe(Subscription subscription) {
366367

367368
@Override
368369
protected void hookOnNext(Payload payload) {
369-
if (!FragmentationUtils.isValid(mtu, payload)) {
370-
payload.release();
371-
cancel();
372-
final IllegalArgumentException t = new IllegalArgumentException("Too big Payload size");
373-
errorConsumer.accept(t);
374-
// no need to send any errors.
375-
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
376-
receiver.onError(t);
377-
return ;
378-
}
370+
if (!FragmentationUtils.isValid(mtu, payload)) {
371+
payload.release();
372+
cancel();
373+
final IllegalArgumentException t =
374+
new IllegalArgumentException("Too big Payload size");
375+
errorConsumer.accept(t);
376+
// no need to send any errors.
377+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
378+
receiver.onError(t);
379+
return;
380+
}
379381
final ByteBuf frame =
380382
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
381383

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.rsocket.RSocket;
2626
import io.rsocket.ResponderRSocket;
2727
import io.rsocket.exceptions.ApplicationErrorException;
28+
import io.rsocket.fragmentation.FragmentationUtils;
2829
import io.rsocket.frame.*;
2930
import io.rsocket.frame.decoder.PayloadDecoder;
3031
import io.rsocket.internal.SynchronizedIntObjectHashMap;
@@ -51,6 +52,8 @@ class RSocketResponder implements ResponderRSocket {
5152
private final Consumer<Throwable> errorConsumer;
5253
private final ResponderLeaseHandler leaseHandler;
5354

55+
private final int mtu;
56+
5457
private final IntObjectMap<Subscription> sendingSubscriptions;
5558
private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
5659

@@ -63,9 +66,11 @@ class RSocketResponder implements ResponderRSocket {
6366
RSocket requestHandler,
6467
PayloadDecoder payloadDecoder,
6568
Consumer<Throwable> errorConsumer,
66-
ResponderLeaseHandler leaseHandler) {
69+
ResponderLeaseHandler leaseHandler,
70+
int mtu) {
6771
this.allocator = allocator;
6872
this.connection = connection;
73+
this.mtu = mtu;
6974

7075
this.requestHandler = requestHandler;
7176
this.responderRSocket =
@@ -371,6 +376,15 @@ protected void hookOnNext(Payload payload) {
371376
isEmpty = false;
372377
}
373378

379+
if (!FragmentationUtils.isValid(mtu, payload)) {
380+
payload.release();
381+
cancel();
382+
final IllegalArgumentException t =
383+
new IllegalArgumentException("Too big Payload size");
384+
handleError(streamId, t);
385+
return;
386+
}
387+
374388
ByteBuf byteBuf;
375389
try {
376390
byteBuf = PayloadFrameFlyweight.encodeNextComplete(allocator, streamId, payload);
@@ -417,6 +431,15 @@ protected void hookOnSubscribe(Subscription s) {
417431

418432
@Override
419433
protected void hookOnNext(Payload payload) {
434+
if (!FragmentationUtils.isValid(mtu, payload)) {
435+
payload.release();
436+
cancel();
437+
final IllegalArgumentException t =
438+
new IllegalArgumentException("Too big Payload size");
439+
handleError(streamId, t);
440+
return;
441+
}
442+
420443
ByteBuf byteBuf;
421444
try {
422445
byteBuf = PayloadFrameFlyweight.encodeNext(allocator, streamId, payload);

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationUtils.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,29 @@
66
import io.rsocket.frame.FrameLengthFlyweight;
77

88
public final class FragmentationUtils {
9-
public static boolean isValid(int mtu, Payload payload) {
10-
return payload.hasMetadata() ? isValid(mtu, payload.data(), payload.metadata()) : isValid(mtu, payload.metadata());
11-
}
9+
public static boolean isValid(int mtu, Payload payload) {
10+
return payload.hasMetadata()
11+
? isValid(mtu, payload.data(), payload.metadata())
12+
: isValid(mtu, payload.metadata());
13+
}
1214

13-
public static boolean isValid(int mtu, ByteBuf data) {
14-
return mtu > 0
15-
|| (((FrameHeaderFlyweight.size()
16-
+ data.readableBytes()
17-
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE)
15+
public static boolean isValid(int mtu, ByteBuf data) {
16+
return mtu > 0
17+
|| (((FrameHeaderFlyweight.size()
18+
+ data.readableBytes()
19+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE)
1820
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
19-
== 0);
20-
}
21+
== 0);
22+
}
2123

22-
public static boolean isValid(int mtu, ByteBuf data, ByteBuf metadata) {
23-
return mtu > 0
24-
|| (((FrameHeaderFlyweight.size()
25-
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE
26-
+ FrameHeaderFlyweight.size()
27-
+ data.readableBytes()
28-
+ metadata.readableBytes())
24+
public static boolean isValid(int mtu, ByteBuf data, ByteBuf metadata) {
25+
return mtu > 0
26+
|| (((FrameHeaderFlyweight.size()
27+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE
28+
+ FrameHeaderFlyweight.size()
29+
+ data.readableBytes()
30+
+ metadata.readableBytes())
2931
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
30-
== 0);
31-
}
32-
32+
== 0);
33+
}
3334
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,20 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.buffer.Unpooled;
2222
import io.netty.util.ReferenceCountUtil;
23-
import io.rsocket.Payload;
24-
import io.rsocket.frame.*;
25-
import java.util.function.Consumer;
23+
import io.rsocket.frame.FrameHeaderFlyweight;
24+
import io.rsocket.frame.FrameLengthFlyweight;
25+
import io.rsocket.frame.FrameType;
26+
import io.rsocket.frame.PayloadFrameFlyweight;
27+
import io.rsocket.frame.RequestChannelFrameFlyweight;
28+
import io.rsocket.frame.RequestFireAndForgetFrameFlyweight;
29+
import io.rsocket.frame.RequestResponseFrameFlyweight;
30+
import io.rsocket.frame.RequestStreamFrameFlyweight;
2631
import org.reactivestreams.Publisher;
2732
import reactor.core.publisher.Flux;
2833
import reactor.core.publisher.SynchronousSink;
2934

35+
import java.util.function.Consumer;
36+
3037
/**
3138
* The implementation of the RSocket fragmentation behavior.
3239
*

0 commit comments

Comments
 (0)