Skip to content

Commit a8500f3

Browse files
committed
initial
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 65ac940 commit a8500f3

File tree

5 files changed

+72
-2
lines changed

5 files changed

+72
-2
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.rsocket.RSocket;
2929
import io.rsocket.exceptions.ConnectionErrorException;
3030
import io.rsocket.exceptions.Exceptions;
31+
import io.rsocket.fragmentation.FragmentationUtils;
3132
import io.rsocket.frame.CancelFrameFlyweight;
3233
import io.rsocket.frame.ErrorFrameFlyweight;
3334
import io.rsocket.frame.FrameHeaderFlyweight;
@@ -88,6 +89,7 @@ class RSocketRequester implements RSocket {
8889
private final IntObjectMap<Subscription> senders;
8990
private final IntObjectMap<Processor<Payload, Payload>> receivers;
9091
private final UnboundedProcessor<ByteBuf> sendProcessor;
92+
private final int mtu;
9193
private final RequesterLeaseHandler leaseHandler;
9294
private final ByteBufAllocator allocator;
9395
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
@@ -99,6 +101,7 @@ class RSocketRequester implements RSocket {
99101
PayloadDecoder payloadDecoder,
100102
Consumer<Throwable> errorConsumer,
101103
StreamIdSupplier streamIdSupplier,
104+
int mtu,
102105
int keepAliveTickPeriod,
103106
int keepAliveAckTimeout,
104107
@Nullable KeepAliveHandler keepAliveHandler,
@@ -108,6 +111,7 @@ class RSocketRequester implements RSocket {
108111
this.payloadDecoder = payloadDecoder;
109112
this.errorConsumer = errorConsumer;
110113
this.streamIdSupplier = streamIdSupplier;
114+
this.mtu = mtu;
111115
this.leaseHandler = leaseHandler;
112116
this.senders = new SynchronizedIntObjectHashMap<>();
113117
this.receivers = new SynchronizedIntObjectHashMap<>();
@@ -186,6 +190,11 @@ private Mono<Void> handleFireAndForget(Payload payload) {
186190
return Mono.error(err);
187191
}
188192

193+
if (!FragmentationUtils.isValid(this.mtu, payload)) {
194+
payload.release();
195+
return Mono.error(new IllegalArgumentException("Too big Payload size"));
196+
}
197+
189198
final int streamId = streamIdSupplier.nextStreamId(receivers);
190199

191200
return UnicastMonoEmpty.newInstance(
@@ -210,6 +219,11 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
210219
return Mono.error(err);
211220
}
212221

222+
if (!FragmentationUtils.isValid(this.mtu, payload)) {
223+
payload.release();
224+
return Mono.error(new IllegalArgumentException("Too big Payload size"));
225+
}
226+
213227
int streamId = streamIdSupplier.nextStreamId(receivers);
214228
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
215229

@@ -255,6 +269,11 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
255269
return Flux.error(err);
256270
}
257271

272+
if (!FragmentationUtils.isValid(this.mtu, payload)) {
273+
payload.release();
274+
return Flux.error(new IllegalArgumentException("Too big Payload size"));
275+
}
276+
258277
int streamId = streamIdSupplier.nextStreamId(receivers);
259278

260279
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
@@ -317,6 +336,12 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
317336
(s, flux) -> {
318337
Payload payload = s.get();
319338
if (payload != null) {
339+
if (!FragmentationUtils.isValid(mtu, payload)) {
340+
payload.release();
341+
final IllegalArgumentException t = new IllegalArgumentException("Too big Payload size");
342+
errorConsumer.accept(t);
343+
return Mono.error(t);
344+
}
320345
return handleChannel(payload, flux.skip(1));
321346
} else {
322347
return flux;
@@ -341,6 +366,16 @@ protected void hookOnSubscribe(Subscription subscription) {
341366

342367
@Override
343368
protected void hookOnNext(Payload payload) {
369+
if (!FragmentationUtils.isValid(mtu, payload)) {
370+
payload.release();
371+
cancel();
372+
final IllegalArgumentException t = new IllegalArgumentException("Too big Payload size");
373+
errorConsumer.accept(t);
374+
// no need to send any errors.
375+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
376+
receiver.onError(t);
377+
return ;
378+
}
344379
final ByteBuf frame =
345380
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
346381

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.rsocket.fragmentation;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.rsocket.Payload;
5+
import io.rsocket.frame.FrameHeaderFlyweight;
6+
import io.rsocket.frame.FrameLengthFlyweight;
7+
8+
public final class FragmentationUtils {
9+
public static boolean isValid(int mtu, Payload payload) {
10+
return payload.hasMetadata() ? isValid(mtu, payload.data(), payload.metadata()) : isValid(mtu, payload.metadata());
11+
}
12+
13+
public static boolean isValid(int mtu, ByteBuf data) {
14+
return mtu > 0
15+
|| (((FrameHeaderFlyweight.size()
16+
+ data.readableBytes()
17+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE)
18+
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
19+
== 0);
20+
}
21+
22+
public static boolean isValid(int mtu, ByteBuf data, ByteBuf metadata) {
23+
return mtu > 0
24+
|| (((FrameHeaderFlyweight.size()
25+
+ FrameLengthFlyweight.FRAME_LENGTH_SIZE
26+
+ FrameHeaderFlyweight.size()
27+
+ data.readableBytes()
28+
+ metadata.readableBytes())
29+
& ~FrameLengthFlyweight.FRAME_LENGTH_MASK)
30+
== 0);
31+
}
32+
33+
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.netty.buffer.Unpooled;
2222
import io.netty.util.ReferenceCountUtil;
23+
import io.rsocket.Payload;
2324
import io.rsocket.frame.*;
2425
import java.util.function.Consumer;
2526
import org.reactivestreams.Publisher;

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ void setUp() {
6767
StreamIdSupplier.clientSupplier(),
6868
0,
6969
0,
70+
0,
7071
null,
7172
RequesterLeaseHandler.None);
7273
}

rsocket-transport-netty/src/test/resources/logback-test.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
</encoder>
2424
</appender>
2525

26-
<logger name="io.rsocket.transport.netty" level="INFO"/>
27-
<logger name="io.rsocket.FrameLogger" level="INFO"/>
26+
<logger name="io.rsocket.transport.netty" level="DEBUG"/>
27+
<logger name="io.rsocket.FrameLogger" level="DEBUG"/>
2828
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
2929

3030
<root level="INFO">

0 commit comments

Comments
 (0)