Skip to content

Commit 6b688a0

Browse files
committed
provides reassembly payload size control
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent e81cb69 commit 6b688a0

File tree

5 files changed

+115
-17
lines changed

5 files changed

+115
-17
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
* }</pre>
7373
*/
7474
public class RSocketConnector {
75+
7576
private static final String CLIENT_TAG = "client";
7677

7778
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
@@ -91,6 +92,7 @@ public class RSocketConnector {
9192
private Supplier<Leases<?>> leasesSupplier;
9293

9394
private int mtu = 0;
95+
private int maxReassemblySize = Integer.MAX_VALUE;
9496
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
9597

9698
private RSocketConnector() {}
@@ -395,6 +397,23 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
395397
return this;
396398
}
397399

400+
/**
401+
* When this is set, frames reassembler control maximum payload size which can be reassembled.
402+
*
403+
* <p>By default this is not set in which case maximum reassembled payloads size is not
404+
* controlled.
405+
*
406+
* @param maxReassemblySize the threshold size for reassembly, must be no less than 16,777,215
407+
* @return the same instance for method chaining
408+
* @see <a
409+
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
410+
* and Reassembly</a>
411+
*/
412+
public RSocketConnector reassemble(int maxReassemblySize) {
413+
this.maxReassemblySize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
414+
return this;
415+
}
416+
398417
/**
399418
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
400419
* broken down into fragments to fit that size.
@@ -477,8 +496,9 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
477496
.map(
478497
connection ->
479498
mtu > 0
480-
? new FragmentationDuplexConnection(connection, mtu, "client")
481-
: new ReassemblyDuplexConnection(connection));
499+
? new FragmentationDuplexConnection(
500+
connection, mtu, maxReassemblySize, "client")
501+
: new ReassemblyDuplexConnection(connection, maxReassemblySize));
482502
return connectionMono
483503
.flatMap(
484504
connection -> {

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public final class RSocketServer {
6666
private Supplier<Leases<?>> leasesSupplier = null;
6767

6868
private int mtu = 0;
69+
private int maxReassemblySize = Integer.MAX_VALUE;
6970
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
7071

7172
private RSocketServer() {}
@@ -198,6 +199,23 @@ public RSocketServer lease(Supplier<Leases<?>> supplier) {
198199
return this;
199200
}
200201

202+
/**
203+
* When this is set, frames reassembler control maximum payload size which can be reassembled.
204+
*
205+
* <p>By default this is not set in which case maximum reassembled payloads size is not
206+
* controlled.
207+
*
208+
* @param maxReassemblySize the threshold size for reassembly, must be no less than 16,777,215
209+
* @return the same instance for method chaining
210+
* @see <a
211+
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
212+
* and Reassembly</a>
213+
*/
214+
public RSocketServer reassemble(int maxReassemblySize) {
215+
this.maxReassemblySize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
216+
return this;
217+
}
218+
201219
/**
202220
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
203221
* fragmented.
@@ -301,8 +319,8 @@ public Mono<Void> apply(DuplexConnection connection) {
301319
private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
302320
connection =
303321
mtu > 0
304-
? new FragmentationDuplexConnection(connection, mtu, "server")
305-
: new ReassemblyDuplexConnection(connection);
322+
? new FragmentationDuplexConnection(connection, mtu, maxReassemblySize, "server")
323+
: new ReassemblyDuplexConnection(connection, maxReassemblySize);
306324

307325
ClientServerInputMultiplexer multiplexer =
308326
new ClientServerInputMultiplexer(connection, interceptors, false);

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@
3939
*/
4040
public final class FragmentationDuplexConnection extends ReassemblyDuplexConnection
4141
implements DuplexConnection {
42+
4243
public static final int MIN_MTU_SIZE = 64;
44+
4345
private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class);
44-
private final DuplexConnection delegate;
45-
private final int mtu;
46-
private final FrameReassembler frameReassembler;
47-
private final String type;
46+
47+
final DuplexConnection delegate;
48+
final int mtu;
49+
final String type;
4850

4951
/**
5052
* Class constructor.
@@ -53,16 +55,14 @@ public final class FragmentationDuplexConnection extends ReassemblyDuplexConnect
5355
* @param mtu the fragment size, greater than {@link #MIN_MTU_SIZE}
5456
* @param type a label to use for logging purposes
5557
*/
56-
public FragmentationDuplexConnection(DuplexConnection delegate, int mtu, String type) {
57-
super(delegate);
58+
public FragmentationDuplexConnection(
59+
DuplexConnection delegate, int mtu, int maxReassemblySize, String type) {
60+
super(delegate, maxReassemblySize);
5861

5962
Objects.requireNonNull(delegate, "delegate must not be null");
6063
this.delegate = delegate;
6164
this.mtu = assertMtu(mtu);
62-
this.frameReassembler = new FrameReassembler(delegate.alloc());
6365
this.type = type;
64-
65-
delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe();
6666
}
6767

6868
private boolean shouldFragment(FrameType frameType, int readableBytes) {

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ final class FrameReassembler extends AtomicBoolean implements Disposable {
4848
final IntObjectMap<CompositeByteBuf> metadata;
4949
final IntObjectMap<CompositeByteBuf> data;
5050

51-
private final ByteBufAllocator allocator;
51+
final ByteBufAllocator allocator;
52+
final int maxReassemblySize;
5253

53-
public FrameReassembler(ByteBufAllocator allocator) {
54+
public FrameReassembler(ByteBufAllocator allocator, int maxReassemblySize) {
5455
this.allocator = allocator;
56+
this.maxReassemblySize = maxReassemblySize;
5557
this.headers = new IntObjectHashMap<>();
5658
this.metadata = new IntObjectHashMap<>();
5759
this.data = new IntObjectHashMap<>();
@@ -100,6 +102,16 @@ synchronized CompositeByteBuf getMetadata(int streamId) {
100102
return byteBuf;
101103
}
102104

105+
synchronized int getMetadataSize(int streamId) {
106+
CompositeByteBuf byteBuf = metadata.get(streamId);
107+
108+
if (byteBuf == null) {
109+
return 0;
110+
}
111+
112+
return byteBuf.readableBytes();
113+
}
114+
103115
synchronized CompositeByteBuf getData(int streamId) {
104116
CompositeByteBuf byteBuf = data.get(streamId);
105117

@@ -111,6 +123,16 @@ synchronized CompositeByteBuf getData(int streamId) {
111123
return byteBuf;
112124
}
113125

126+
synchronized int getDataSize(int streamId) {
127+
CompositeByteBuf byteBuf = data.get(streamId);
128+
129+
if (byteBuf == null) {
130+
return 0;
131+
}
132+
133+
return byteBuf.readableBytes();
134+
}
135+
114136
@Nullable
115137
synchronized ByteBuf removeHeader(int streamId) {
116138
return headers.remove(streamId);
@@ -151,6 +173,19 @@ void cancelAssemble(int streamId) {
151173
void handleNoFollowsFlag(ByteBuf frame, SynchronousSink<ByteBuf> sink, int streamId) {
152174
ByteBuf header = removeHeader(streamId);
153175
if (header != null) {
176+
177+
int maxReassemblySize = this.maxReassemblySize;
178+
if (maxReassemblySize != Integer.MAX_VALUE) {
179+
int currentPayloadSize = getMetadataSize(streamId) + getDataSize(streamId);
180+
if (currentPayloadSize + frame.readableBytes() - FrameHeaderCodec.size()
181+
> maxReassemblySize) {
182+
header.release();
183+
frame.release();
184+
cancelAssemble(streamId);
185+
throw new IllegalStateException("Reassembled payload went out of allowed size");
186+
}
187+
}
188+
154189
if (FrameHeaderCodec.hasMetadata(header)) {
155190
ByteBuf assembledFrame = assembleFrameWithMetadata(frame, streamId, header);
156191
sink.next(assembledFrame);
@@ -177,6 +212,17 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
177212
putHeader(streamId, header);
178213
}
179214

215+
int maxReassemblySize = this.maxReassemblySize;
216+
if (maxReassemblySize != Integer.MAX_VALUE) {
217+
int currentPayloadSize = getMetadataSize(streamId) + getDataSize(streamId);
218+
if (currentPayloadSize + frame.readableBytes() - FrameHeaderCodec.size()
219+
> maxReassemblySize) {
220+
cancelAssemble(streamId);
221+
frame.release();
222+
throw new IllegalStateException("Reassembled payload went out of allowed size");
223+
}
224+
}
225+
180226
if (FrameHeaderCodec.hasMetadata(frame)) {
181227
CompositeByteBuf metadata = getMetadata(streamId);
182228
switch (frameType) {
@@ -226,6 +272,7 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
226272
data = PayloadFrameCodec.data(frame).retain();
227273
break;
228274
default:
275+
frame.release();
229276
throw new IllegalStateException("unsupported fragment type");
230277
}
231278

rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.DuplexConnection;
22+
import io.rsocket.frame.FrameLengthCodec;
2223
import java.util.Objects;
2324
import org.reactivestreams.Publisher;
2425
import reactor.core.publisher.Flux;
@@ -36,14 +37,26 @@ public class ReassemblyDuplexConnection implements DuplexConnection {
3637
private final FrameReassembler frameReassembler;
3738

3839
/** Constructor with the underlying delegate to receive frames from. */
39-
public ReassemblyDuplexConnection(DuplexConnection delegate) {
40+
public ReassemblyDuplexConnection(DuplexConnection delegate, int maxReassemblySize) {
4041
Objects.requireNonNull(delegate, "delegate must not be null");
4142
this.delegate = delegate;
42-
this.frameReassembler = new FrameReassembler(delegate.alloc());
43+
this.frameReassembler = new FrameReassembler(delegate.alloc(), maxReassemblySize);
4344

4445
delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe();
4546
}
4647

48+
public static int assertMaxReassemblySize(int maxReassemblySize) {
49+
if (maxReassemblySize < FrameLengthCodec.FRAME_LENGTH_MASK) {
50+
String msg =
51+
String.format(
52+
"The smallest allowed maxReassemblySize size is %d bytes, provided: %d",
53+
FrameLengthCodec.FRAME_LENGTH_MASK, maxReassemblySize);
54+
throw new IllegalArgumentException(msg);
55+
} else {
56+
return maxReassemblySize;
57+
}
58+
}
59+
4760
@Override
4861
public Mono<Void> send(Publisher<ByteBuf> frames) {
4962
return delegate.send(frames);

0 commit comments

Comments
 (0)