Skip to content

Commit 2be545b

Browse files
committed
Merge branch 1.0.x into master
Signed-off-by: Oleh Dokuka <[email protected]>
2 parents 0965a71 + 1237fc5 commit 2be545b

36 files changed

+778
-268
lines changed
Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.rsocket.core;
22

3+
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
4+
35
import io.rsocket.Payload;
46
import io.rsocket.frame.FrameHeaderCodec;
57
import io.rsocket.frame.FrameLengthCodec;
@@ -8,25 +10,52 @@ final class PayloadValidationUtils {
810
static final String INVALID_PAYLOAD_ERROR_MESSAGE =
911
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.";
1012

11-
static boolean isValid(int mtu, Payload payload) {
13+
static boolean isValid(int mtu, Payload payload, int maxFrameLength) {
1214
if (mtu > 0) {
1315
return true;
1416
}
1517

1618
if (payload.hasMetadata()) {
17-
return (((FrameHeaderCodec.size()
18-
+ FrameLengthCodec.FRAME_LENGTH_SIZE
19-
+ FrameHeaderCodec.size()
20-
+ payload.data().readableBytes()
21-
+ payload.metadata().readableBytes())
22-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
23-
== 0);
19+
return ((FrameHeaderCodec.size()
20+
+ FrameLengthCodec.FRAME_LENGTH_SIZE
21+
+ FrameHeaderCodec.size()
22+
+ payload.data().readableBytes()
23+
+ payload.metadata().readableBytes())
24+
<= maxFrameLength);
2425
} else {
25-
return (((FrameHeaderCodec.size()
26-
+ payload.data().readableBytes()
27-
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
28-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
29-
== 0);
26+
return ((FrameHeaderCodec.size()
27+
+ payload.data().readableBytes()
28+
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
29+
<= maxFrameLength);
30+
}
31+
}
32+
33+
static void assertValidateSetup(int maxFrameLength, int maxInboundPayloadSize, int mtu) {
34+
35+
if (maxFrameLength > FRAME_LENGTH_MASK) {
36+
throw new IllegalArgumentException(
37+
"Configured maxFrameLength["
38+
+ maxFrameLength
39+
+ "] exceeds maxFrameLength limit "
40+
+ FRAME_LENGTH_MASK);
41+
}
42+
43+
if (maxFrameLength > maxInboundPayloadSize) {
44+
throw new IllegalArgumentException(
45+
"Configured maxFrameLength["
46+
+ maxFrameLength
47+
+ "] exceeds maxPayloadSize["
48+
+ maxInboundPayloadSize
49+
+ "]");
50+
}
51+
52+
if (mtu != 0 && mtu > maxFrameLength) {
53+
throw new IllegalArgumentException(
54+
"Configured maximumTransmissionUnit["
55+
+ mtu
56+
+ "] exceeds configured maxFrameLength["
57+
+ maxFrameLength
58+
+ "]");
3059
}
3160
}
3261
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 156 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import static io.rsocket.core.PayloadValidationUtils.assertValidateSetup;
19+
1820
import io.netty.buffer.ByteBuf;
1921
import io.netty.buffer.Unpooled;
2022
import io.rsocket.ConnectionSetupPayload;
@@ -93,6 +95,7 @@ public class RSocketConnector {
9395
private Supplier<Leases<?>> leasesSupplier;
9496

9597
private int mtu = 0;
98+
private int maxInboundPayloadSize = Integer.MAX_VALUE;
9699
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
97100

98101
private RSocketConnector() {}
@@ -422,6 +425,27 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
422425
return this;
423426
}
424427

428+
/**
429+
* When this is set, frames reassembler control maximum payload size which can be reassembled.
430+
*
431+
* <p>By default this is not set in which case maximum reassembled payloads size is not
432+
* controlled.
433+
*
434+
* @param maxInboundPayloadSize the threshold size for reassembly, must no be less than 64 bytes.
435+
* Please note, {@code maxInboundPayloadSize} must always be greater or equal to {@link
436+
* io.rsocket.transport.Transport#maxFrameLength()}, otherwise inbound frame can exceed the
437+
* {@code maxInboundPayloadSize}
438+
* @return the same instance for method chaining
439+
* @see <a
440+
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
441+
* and Reassembly</a>
442+
*/
443+
public RSocketConnector maxInboundPayloadSize(int maxInboundPayloadSize) {
444+
this.maxInboundPayloadSize =
445+
ReassemblyDuplexConnection.assertInboundPayloadSize(maxInboundPayloadSize);
446+
return this;
447+
}
448+
425449
/**
426450
* When this is set, frames larger than the given maximum transmission unit (mtu) size value are
427451
* broken down into fragments to fit that size.
@@ -541,121 +565,142 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
541565
}
542566

543567
private Mono<RSocket> connect0(Supplier<ClientTransport> transportSupplier) {
544-
Mono<DuplexConnection> connectionMono =
545-
Mono.fromSupplier(transportSupplier)
546-
.flatMap(ClientTransport::connect)
547-
.map(
548-
connection ->
549-
mtu > 0
550-
? new FragmentationDuplexConnection(connection, mtu, "client")
551-
: new ReassemblyDuplexConnection(connection));
552-
return connectionMono
553-
.flatMap(
554-
connection ->
555-
setupPayloadMono
556-
.defaultIfEmpty(EmptyPayload.INSTANCE)
557-
.map(setupPayload -> Tuples.of(connection, setupPayload))
558-
.doOnError(ex -> connection.dispose())
559-
.doOnCancel(connection::dispose))
568+
return Mono.fromSupplier(transportSupplier)
560569
.flatMap(
561-
tuple -> {
562-
DuplexConnection connection = tuple.getT1();
563-
Payload setupPayload = tuple.getT2();
564-
565-
ByteBuf resumeToken;
566-
KeepAliveHandler keepAliveHandler;
567-
DuplexConnection wrappedConnection;
568-
569-
if (resume != null) {
570-
resumeToken = resume.getTokenSupplier().get();
571-
ClientRSocketSession session =
572-
new ClientRSocketSession(
573-
connection,
574-
resume.getSessionDuration(),
575-
resume.getRetry(),
576-
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
577-
resume.getStreamTimeout(),
578-
resume.isCleanupStoreOnKeepAlive())
579-
.continueWith(connectionMono)
580-
.resumeToken(resumeToken);
581-
keepAliveHandler =
582-
new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection());
583-
wrappedConnection = session.resumableConnection();
584-
} else {
585-
resumeToken = Unpooled.EMPTY_BUFFER;
586-
keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection);
587-
wrappedConnection = connection;
588-
}
589-
590-
ClientServerInputMultiplexer multiplexer =
591-
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
592-
593-
boolean leaseEnabled = leasesSupplier != null;
594-
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
595-
RequesterLeaseHandler requesterLeaseHandler =
596-
leaseEnabled
597-
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
598-
: RequesterLeaseHandler.None;
599-
600-
RSocket rSocketRequester =
601-
new RSocketRequester(
602-
multiplexer.asClientConnection(),
603-
payloadDecoder,
604-
StreamIdSupplier.clientSupplier(),
605-
mtu,
606-
(int) keepAliveInterval.toMillis(),
607-
(int) keepAliveMaxLifeTime.toMillis(),
608-
keepAliveHandler,
609-
requesterLeaseHandler,
610-
Schedulers.single(Schedulers.parallel()));
611-
612-
RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
613-
614-
ByteBuf setupFrame =
615-
SetupFrameCodec.encode(
616-
wrappedConnection.alloc(),
617-
leaseEnabled,
618-
(int) keepAliveInterval.toMillis(),
619-
(int) keepAliveMaxLifeTime.toMillis(),
620-
resumeToken,
621-
metadataMimeType,
622-
dataMimeType,
623-
setupPayload);
624-
625-
SocketAcceptor acceptor =
626-
this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {});
627-
628-
ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame);
629-
630-
return interceptors
631-
.initSocketAcceptor(acceptor)
632-
.accept(setup, wrappedRSocketRequester)
570+
ct -> {
571+
int maxFrameLength = ct.maxFrameLength();
572+
573+
Mono<DuplexConnection> connectionMono =
574+
Mono.fromCallable(
575+
() -> {
576+
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
577+
return ct;
578+
})
579+
.flatMap(transport -> transport.connect())
580+
.map(
581+
connection ->
582+
mtu > 0
583+
? new FragmentationDuplexConnection(
584+
connection, mtu, maxInboundPayloadSize, "client")
585+
: new ReassemblyDuplexConnection(
586+
connection, maxInboundPayloadSize));
587+
588+
return connectionMono
633589
.flatMap(
634-
rSocketHandler -> {
635-
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
636-
637-
ResponderLeaseHandler responderLeaseHandler =
590+
connection ->
591+
setupPayloadMono
592+
.defaultIfEmpty(EmptyPayload.INSTANCE)
593+
.map(setupPayload -> Tuples.of(connection, setupPayload))
594+
.doOnError(ex -> connection.dispose())
595+
.doOnCancel(connection::dispose))
596+
.flatMap(
597+
tuple -> {
598+
DuplexConnection connection = tuple.getT1();
599+
Payload setupPayload = tuple.getT2();
600+
ByteBuf resumeToken;
601+
KeepAliveHandler keepAliveHandler;
602+
DuplexConnection wrappedConnection;
603+
604+
if (resume != null) {
605+
resumeToken = resume.getTokenSupplier().get();
606+
ClientRSocketSession session =
607+
new ClientRSocketSession(
608+
connection,
609+
resume.getSessionDuration(),
610+
resume.getRetry(),
611+
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
612+
resume.getStreamTimeout(),
613+
resume.isCleanupStoreOnKeepAlive())
614+
.continueWith(connectionMono)
615+
.resumeToken(resumeToken);
616+
keepAliveHandler =
617+
new KeepAliveHandler.ResumableKeepAliveHandler(
618+
session.resumableConnection());
619+
wrappedConnection = session.resumableConnection();
620+
} else {
621+
resumeToken = Unpooled.EMPTY_BUFFER;
622+
keepAliveHandler =
623+
new KeepAliveHandler.DefaultKeepAliveHandler(connection);
624+
wrappedConnection = connection;
625+
}
626+
627+
ClientServerInputMultiplexer multiplexer =
628+
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
629+
630+
boolean leaseEnabled = leasesSupplier != null;
631+
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
632+
RequesterLeaseHandler requesterLeaseHandler =
638633
leaseEnabled
639-
? new ResponderLeaseHandler.Impl<>(
640-
CLIENT_TAG,
641-
wrappedConnection.alloc(),
642-
leases.sender(),
643-
leases.stats())
644-
: ResponderLeaseHandler.None;
645-
646-
RSocket rSocketResponder =
647-
new RSocketResponder(
648-
multiplexer.asServerConnection(),
649-
wrappedRSocketHandler,
634+
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
635+
: RequesterLeaseHandler.None;
636+
637+
RSocket rSocketRequester =
638+
new RSocketRequester(
639+
multiplexer.asClientConnection(),
650640
payloadDecoder,
651-
responderLeaseHandler,
652-
mtu);
653-
654-
return wrappedConnection
655-
.sendOne(setupFrame.retain())
656-
.thenReturn(wrappedRSocketRequester);
657-
})
658-
.doFinally(signalType -> setup.release());
641+
StreamIdSupplier.clientSupplier(),
642+
mtu,
643+
maxFrameLength,
644+
(int) keepAliveInterval.toMillis(),
645+
(int) keepAliveMaxLifeTime.toMillis(),
646+
keepAliveHandler,
647+
requesterLeaseHandler,
648+
Schedulers.single(Schedulers.parallel()));
649+
650+
RSocket wrappedRSocketRequester =
651+
interceptors.initRequester(rSocketRequester);
652+
653+
ByteBuf setupFrame =
654+
SetupFrameCodec.encode(
655+
wrappedConnection.alloc(),
656+
leaseEnabled,
657+
(int) keepAliveInterval.toMillis(),
658+
(int) keepAliveMaxLifeTime.toMillis(),
659+
resumeToken,
660+
metadataMimeType,
661+
dataMimeType,
662+
setupPayload);
663+
664+
SocketAcceptor acceptor =
665+
this.acceptor != null
666+
? this.acceptor
667+
: SocketAcceptor.with(new RSocket() {});
668+
669+
ConnectionSetupPayload setup =
670+
new DefaultConnectionSetupPayload(setupFrame);
671+
672+
return interceptors
673+
.initSocketAcceptor(acceptor)
674+
.accept(setup, wrappedRSocketRequester)
675+
.flatMap(
676+
rSocketHandler -> {
677+
RSocket wrappedRSocketHandler =
678+
interceptors.initResponder(rSocketHandler);
679+
680+
ResponderLeaseHandler responderLeaseHandler =
681+
leaseEnabled
682+
? new ResponderLeaseHandler.Impl<>(
683+
CLIENT_TAG,
684+
wrappedConnection.alloc(),
685+
leases.sender(),
686+
leases.stats())
687+
: ResponderLeaseHandler.None;
688+
689+
RSocket rSocketResponder =
690+
new RSocketResponder(
691+
multiplexer.asServerConnection(),
692+
wrappedRSocketHandler,
693+
payloadDecoder,
694+
responderLeaseHandler,
695+
mtu,
696+
maxFrameLength);
697+
698+
return wrappedConnection
699+
.sendOne(setupFrame.retain())
700+
.thenReturn(wrappedRSocketRequester);
701+
})
702+
.doFinally(signalType -> setup.release());
703+
});
659704
});
660705
}
661706
}

0 commit comments

Comments
 (0)