Skip to content

Commit 1237fc5

Browse files
authored
provides setup for maxInboundPayloadSize and maxFrameSize (#876)
1 parent 96ebf27 commit 1237fc5

35 files changed

+777
-269
lines changed
Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.rsocket.core;
22

3+
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
4+
35
import io.rsocket.Payload;
46
import io.rsocket.frame.FrameHeaderCodec;
57
import io.rsocket.frame.FrameLengthCodec;
@@ -8,25 +10,52 @@ final class PayloadValidationUtils {
810
static final String INVALID_PAYLOAD_ERROR_MESSAGE =
911
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.";
1012

11-
static boolean isValid(int mtu, Payload payload) {
13+
static boolean isValid(int mtu, Payload payload, int maxFrameLength) {
1214
if (mtu > 0) {
1315
return true;
1416
}
1517

1618
if (payload.hasMetadata()) {
17-
return (((FrameHeaderCodec.size()
18-
+ FrameLengthCodec.FRAME_LENGTH_SIZE
19-
+ FrameHeaderCodec.size()
20-
+ payload.data().readableBytes()
21-
+ payload.metadata().readableBytes())
22-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
23-
== 0);
19+
return ((FrameHeaderCodec.size()
20+
+ FrameLengthCodec.FRAME_LENGTH_SIZE
21+
+ FrameHeaderCodec.size()
22+
+ payload.data().readableBytes()
23+
+ payload.metadata().readableBytes())
24+
<= maxFrameLength);
2425
} else {
25-
return (((FrameHeaderCodec.size()
26-
+ payload.data().readableBytes()
27-
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
28-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
29-
== 0);
26+
return ((FrameHeaderCodec.size()
27+
+ payload.data().readableBytes()
28+
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
29+
<= maxFrameLength);
30+
}
31+
}
32+
33+
static void assertValidateSetup(int maxFrameLength, int maxInboundPayloadSize, int mtu) {
34+
35+
if (maxFrameLength > FRAME_LENGTH_MASK) {
36+
throw new IllegalArgumentException(
37+
"Configured maxFrameLength["
38+
+ maxFrameLength
39+
+ "] exceeds maxFrameLength limit "
40+
+ FRAME_LENGTH_MASK);
41+
}
42+
43+
if (maxFrameLength > maxInboundPayloadSize) {
44+
throw new IllegalArgumentException(
45+
"Configured maxFrameLength["
46+
+ maxFrameLength
47+
+ "] exceeds maxPayloadSize["
48+
+ maxInboundPayloadSize
49+
+ "]");
50+
}
51+
52+
if (mtu != 0 && mtu > maxFrameLength) {
53+
throw new IllegalArgumentException(
54+
"Configured maximumTransmissionUnit["
55+
+ mtu
56+
+ "] exceeds configured maxFrameLength["
57+
+ maxFrameLength
58+
+ "]");
3059
}
3160
}
3261
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 156 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import static io.rsocket.core.PayloadValidationUtils.assertValidateSetup;
19+
1820
import io.netty.buffer.ByteBuf;
1921
import io.netty.buffer.Unpooled;
2022
import io.rsocket.ConnectionSetupPayload;
@@ -92,6 +94,7 @@ public class RSocketConnector {
9294
private Supplier<Leases<?>> leasesSupplier;
9395

9496
private int mtu = 0;
97+
private int maxInboundPayloadSize = Integer.MAX_VALUE;
9598
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
9699

97100
private RSocketConnector() {}
@@ -412,6 +415,27 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
412415
return this;
413416
}
414417

418+
/**
419+
* When this is set, frames reassembler control maximum payload size which can be reassembled.
420+
*
421+
* <p>By default this is not set in which case maximum reassembled payloads size is not
422+
* controlled.
423+
*
424+
* @param maxInboundPayloadSize the threshold size for reassembly, must no be less than 64 bytes.
425+
* Please note, {@code maxInboundPayloadSize} must always be greater or equal to {@link
426+
* io.rsocket.transport.Transport#maxFrameLength()}, otherwise inbound frame can exceed the
427+
* {@code maxInboundPayloadSize}
428+
* @return the same instance for method chaining
429+
* @see <a
430+
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
431+
* and Reassembly</a>
432+
*/
433+
public RSocketConnector maxInboundPayloadSize(int maxInboundPayloadSize) {
434+
this.maxInboundPayloadSize =
435+
ReassemblyDuplexConnection.assertInboundPayloadSize(maxInboundPayloadSize);
436+
return this;
437+
}
438+
415439
/**
416440
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
417441
* broken down into fragments to fit that size.
@@ -488,122 +512,142 @@ public Mono<RSocket> connect(ClientTransport transport) {
488512
*/
489513
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
490514

491-
Mono<DuplexConnection> connectionMono =
492-
Mono.fromSupplier(transportSupplier)
493-
.flatMap(ClientTransport::connect)
494-
.map(
495-
connection ->
496-
mtu > 0
497-
? new FragmentationDuplexConnection(connection, mtu, "client")
498-
: new ReassemblyDuplexConnection(connection));
499-
500-
return connectionMono
515+
return Mono.fromSupplier(transportSupplier)
501516
.flatMap(
502-
connection ->
503-
setupPayloadMono
504-
.defaultIfEmpty(EmptyPayload.INSTANCE)
505-
.map(setupPayload -> Tuples.of(connection, setupPayload))
506-
.doOnError(ex -> connection.dispose())
507-
.doOnCancel(connection::dispose))
508-
.flatMap(
509-
tuple -> {
510-
DuplexConnection connection = tuple.getT1();
511-
Payload setupPayload = tuple.getT2();
512-
513-
ByteBuf resumeToken;
514-
KeepAliveHandler keepAliveHandler;
515-
DuplexConnection wrappedConnection;
516-
517-
if (resume != null) {
518-
resumeToken = resume.getTokenSupplier().get();
519-
ClientRSocketSession session =
520-
new ClientRSocketSession(
521-
connection,
522-
resume.getSessionDuration(),
523-
resume.getRetry(),
524-
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
525-
resume.getStreamTimeout(),
526-
resume.isCleanupStoreOnKeepAlive())
527-
.continueWith(connectionMono)
528-
.resumeToken(resumeToken);
529-
keepAliveHandler =
530-
new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection());
531-
wrappedConnection = session.resumableConnection();
532-
} else {
533-
resumeToken = Unpooled.EMPTY_BUFFER;
534-
keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection);
535-
wrappedConnection = connection;
536-
}
537-
538-
ClientServerInputMultiplexer multiplexer =
539-
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
540-
541-
boolean leaseEnabled = leasesSupplier != null;
542-
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
543-
RequesterLeaseHandler requesterLeaseHandler =
544-
leaseEnabled
545-
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
546-
: RequesterLeaseHandler.None;
547-
548-
RSocket rSocketRequester =
549-
new RSocketRequester(
550-
multiplexer.asClientConnection(),
551-
payloadDecoder,
552-
StreamIdSupplier.clientSupplier(),
553-
mtu,
554-
(int) keepAliveInterval.toMillis(),
555-
(int) keepAliveMaxLifeTime.toMillis(),
556-
keepAliveHandler,
557-
requesterLeaseHandler,
558-
Schedulers.single(Schedulers.parallel()));
559-
560-
RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
561-
562-
ByteBuf setupFrame =
563-
SetupFrameCodec.encode(
564-
wrappedConnection.alloc(),
565-
leaseEnabled,
566-
(int) keepAliveInterval.toMillis(),
567-
(int) keepAliveMaxLifeTime.toMillis(),
568-
resumeToken,
569-
metadataMimeType,
570-
dataMimeType,
571-
setupPayload);
572-
573-
SocketAcceptor acceptor =
574-
this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {});
575-
576-
ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame);
577-
578-
return interceptors
579-
.initSocketAcceptor(acceptor)
580-
.accept(setup, wrappedRSocketRequester)
517+
ct -> {
518+
int maxFrameLength = ct.maxFrameLength();
519+
520+
Mono<DuplexConnection> connectionMono =
521+
Mono.fromCallable(
522+
() -> {
523+
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
524+
return ct;
525+
})
526+
.flatMap(transport -> transport.connect())
527+
.map(
528+
connection ->
529+
mtu > 0
530+
? new FragmentationDuplexConnection(
531+
connection, mtu, maxInboundPayloadSize, "client")
532+
: new ReassemblyDuplexConnection(
533+
connection, maxInboundPayloadSize));
534+
535+
return connectionMono
581536
.flatMap(
582-
rSocketHandler -> {
583-
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
584-
585-
ResponderLeaseHandler responderLeaseHandler =
537+
connection ->
538+
setupPayloadMono
539+
.defaultIfEmpty(EmptyPayload.INSTANCE)
540+
.map(setupPayload -> Tuples.of(connection, setupPayload))
541+
.doOnError(ex -> connection.dispose())
542+
.doOnCancel(connection::dispose))
543+
.flatMap(
544+
tuple -> {
545+
DuplexConnection connection = tuple.getT1();
546+
Payload setupPayload = tuple.getT2();
547+
ByteBuf resumeToken;
548+
KeepAliveHandler keepAliveHandler;
549+
DuplexConnection wrappedConnection;
550+
551+
if (resume != null) {
552+
resumeToken = resume.getTokenSupplier().get();
553+
ClientRSocketSession session =
554+
new ClientRSocketSession(
555+
connection,
556+
resume.getSessionDuration(),
557+
resume.getRetry(),
558+
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
559+
resume.getStreamTimeout(),
560+
resume.isCleanupStoreOnKeepAlive())
561+
.continueWith(connectionMono)
562+
.resumeToken(resumeToken);
563+
keepAliveHandler =
564+
new KeepAliveHandler.ResumableKeepAliveHandler(
565+
session.resumableConnection());
566+
wrappedConnection = session.resumableConnection();
567+
} else {
568+
resumeToken = Unpooled.EMPTY_BUFFER;
569+
keepAliveHandler =
570+
new KeepAliveHandler.DefaultKeepAliveHandler(connection);
571+
wrappedConnection = connection;
572+
}
573+
574+
ClientServerInputMultiplexer multiplexer =
575+
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
576+
577+
boolean leaseEnabled = leasesSupplier != null;
578+
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
579+
RequesterLeaseHandler requesterLeaseHandler =
586580
leaseEnabled
587-
? new ResponderLeaseHandler.Impl<>(
588-
CLIENT_TAG,
589-
wrappedConnection.alloc(),
590-
leases.sender(),
591-
leases.stats())
592-
: ResponderLeaseHandler.None;
593-
594-
RSocket rSocketResponder =
595-
new RSocketResponder(
596-
multiplexer.asServerConnection(),
597-
wrappedRSocketHandler,
581+
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
582+
: RequesterLeaseHandler.None;
583+
584+
RSocket rSocketRequester =
585+
new RSocketRequester(
586+
multiplexer.asClientConnection(),
598587
payloadDecoder,
599-
responderLeaseHandler,
600-
mtu);
601-
602-
return wrappedConnection
603-
.sendOne(setupFrame.retain())
604-
.thenReturn(wrappedRSocketRequester);
605-
})
606-
.doFinally(signalType -> setup.release());
588+
StreamIdSupplier.clientSupplier(),
589+
mtu,
590+
maxFrameLength,
591+
(int) keepAliveInterval.toMillis(),
592+
(int) keepAliveMaxLifeTime.toMillis(),
593+
keepAliveHandler,
594+
requesterLeaseHandler,
595+
Schedulers.single(Schedulers.parallel()));
596+
597+
RSocket wrappedRSocketRequester =
598+
interceptors.initRequester(rSocketRequester);
599+
600+
ByteBuf setupFrame =
601+
SetupFrameCodec.encode(
602+
wrappedConnection.alloc(),
603+
leaseEnabled,
604+
(int) keepAliveInterval.toMillis(),
605+
(int) keepAliveMaxLifeTime.toMillis(),
606+
resumeToken,
607+
metadataMimeType,
608+
dataMimeType,
609+
setupPayload);
610+
611+
SocketAcceptor acceptor =
612+
this.acceptor != null
613+
? this.acceptor
614+
: SocketAcceptor.with(new RSocket() {});
615+
616+
ConnectionSetupPayload setup =
617+
new DefaultConnectionSetupPayload(setupFrame);
618+
619+
return interceptors
620+
.initSocketAcceptor(acceptor)
621+
.accept(setup, wrappedRSocketRequester)
622+
.flatMap(
623+
rSocketHandler -> {
624+
RSocket wrappedRSocketHandler =
625+
interceptors.initResponder(rSocketHandler);
626+
627+
ResponderLeaseHandler responderLeaseHandler =
628+
leaseEnabled
629+
? new ResponderLeaseHandler.Impl<>(
630+
CLIENT_TAG,
631+
wrappedConnection.alloc(),
632+
leases.sender(),
633+
leases.stats())
634+
: ResponderLeaseHandler.None;
635+
636+
RSocket rSocketResponder =
637+
new RSocketResponder(
638+
multiplexer.asServerConnection(),
639+
wrappedRSocketHandler,
640+
payloadDecoder,
641+
responderLeaseHandler,
642+
mtu,
643+
maxFrameLength);
644+
645+
return wrappedConnection
646+
.sendOne(setupFrame.retain())
647+
.thenReturn(wrappedRSocketRequester);
648+
})
649+
.doFinally(signalType -> setup.release());
650+
});
607651
})
608652
.as(
609653
source -> {

0 commit comments

Comments
 (0)