Skip to content

Commit c0fbd40

Browse files
committed
polishing
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 009e31a commit c0fbd40

35 files changed

+702
-290
lines changed
Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.rsocket.core;
22

3+
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
4+
35
import io.rsocket.Payload;
46
import io.rsocket.frame.FrameHeaderCodec;
57
import io.rsocket.frame.FrameLengthCodec;
@@ -8,25 +10,52 @@ final class PayloadValidationUtils {
810
static final String INVALID_PAYLOAD_ERROR_MESSAGE =
911
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.";
1012

11-
static boolean isValid(int mtu, Payload payload) {
13+
static boolean isValid(int mtu, Payload payload, int maxFrameLength) {
1214
if (mtu > 0) {
1315
return true;
1416
}
1517

1618
if (payload.hasMetadata()) {
17-
return (((FrameHeaderCodec.size()
18-
+ FrameLengthCodec.FRAME_LENGTH_SIZE
19-
+ FrameHeaderCodec.size()
20-
+ payload.data().readableBytes()
21-
+ payload.metadata().readableBytes())
22-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
23-
== 0);
19+
return ((FrameHeaderCodec.size()
20+
+ FrameLengthCodec.FRAME_LENGTH_SIZE
21+
+ FrameHeaderCodec.size()
22+
+ payload.data().readableBytes()
23+
+ payload.metadata().readableBytes())
24+
<= maxFrameLength);
2425
} else {
25-
return (((FrameHeaderCodec.size()
26-
+ payload.data().readableBytes()
27-
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
28-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
29-
== 0);
26+
return ((FrameHeaderCodec.size()
27+
+ payload.data().readableBytes()
28+
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
29+
<= maxFrameLength);
30+
}
31+
}
32+
33+
static void assertValidateSetup(int maxFrameLength, int maxInboundPayloadSize, int mtu) {
34+
35+
if (maxFrameLength > FRAME_LENGTH_MASK) {
36+
throw new IllegalArgumentException(
37+
"Configured maxFrameLength["
38+
+ maxFrameLength
39+
+ "] exceeds maxFrameLength limit "
40+
+ FRAME_LENGTH_MASK);
41+
}
42+
43+
if (maxFrameLength > maxInboundPayloadSize) {
44+
throw new IllegalArgumentException(
45+
"Configured maxFrameLength["
46+
+ maxFrameLength
47+
+ "] exceeds maxPayloadSize["
48+
+ maxInboundPayloadSize
49+
+ "]");
50+
}
51+
52+
if (mtu != 0 && mtu > maxFrameLength) {
53+
throw new IllegalArgumentException(
54+
"Configured maximumTransmissionUnit["
55+
+ mtu
56+
+ "] exceeds configured maxFrameLength["
57+
+ maxFrameLength
58+
+ "]");
3059
}
3160
}
3261
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 142 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import static io.rsocket.core.PayloadValidationUtils.assertValidateSetup;
19+
1820
import io.netty.buffer.ByteBuf;
1921
import io.netty.buffer.Unpooled;
2022
import io.rsocket.ConnectionSetupPayload;
@@ -92,7 +94,7 @@ public class RSocketConnector {
9294
private Supplier<Leases<?>> leasesSupplier;
9395

9496
private int mtu = 0;
95-
private int maxReassemblySize = Integer.MAX_VALUE;
97+
private int maxInboundPayloadSize = Integer.MAX_VALUE;
9698
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
9799

98100
private RSocketConnector() {}
@@ -419,14 +421,18 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
419421
* <p>By default this is not set in which case maximum reassembled payloads size is not
420422
* controlled.
421423
*
422-
* @param maxReassemblySize the threshold size for reassembly, must be no less than 16,777,215
424+
* @param maxInboundPayloadSize the threshold size for reassembly, must no be less than 64 bytes.
425+
* Please note, {@code maxInboundPayloadSize} must always be greater or equal to {@link
426+
* io.rsocket.transport.Transport#maxFrameLength()}, otherwise inbound frame can exceed the
427+
* {@code maxInboundPayloadSize}
423428
* @return the same instance for method chaining
424429
* @see <a
425430
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
426431
* and Reassembly</a>
427432
*/
428-
public RSocketConnector reassemble(int maxReassemblySize) {
429-
this.maxReassemblySize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
433+
public RSocketConnector maxInboundPayloadSize(int maxInboundPayloadSize) {
434+
this.maxInboundPayloadSize =
435+
ReassemblyDuplexConnection.assertInboundPayloadSize(maxInboundPayloadSize);
430436
return this;
431437
}
432438

@@ -506,122 +512,142 @@ public Mono<RSocket> connect(ClientTransport transport) {
506512
*/
507513
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
508514

509-
Mono<DuplexConnection> connectionMono =
510-
Mono.fromSupplier(transportSupplier)
511-
.flatMap(ClientTransport::connect)
512-
.map(
513-
connection ->
514-
mtu > 0
515-
? new FragmentationDuplexConnection(
516-
connection, mtu, maxReassemblySize, "client")
517-
: new ReassemblyDuplexConnection(connection, maxReassemblySize));
518-
return connectionMono
519-
.flatMap(
520-
connection ->
521-
setupPayloadMono
522-
.defaultIfEmpty(EmptyPayload.INSTANCE)
523-
.map(setupPayload -> Tuples.of(connection, setupPayload))
524-
.doOnError(ex -> connection.dispose())
525-
.doOnCancel(connection::dispose))
515+
return Mono.fromSupplier(transportSupplier)
526516
.flatMap(
527-
tuple -> {
528-
DuplexConnection connection = tuple.getT1();
529-
Payload setupPayload = tuple.getT2();
530-
531-
ByteBuf resumeToken;
532-
KeepAliveHandler keepAliveHandler;
533-
DuplexConnection wrappedConnection;
534-
535-
if (resume != null) {
536-
resumeToken = resume.getTokenSupplier().get();
537-
ClientRSocketSession session =
538-
new ClientRSocketSession(
539-
connection,
540-
resume.getSessionDuration(),
541-
resume.getRetry(),
542-
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
543-
resume.getStreamTimeout(),
544-
resume.isCleanupStoreOnKeepAlive())
545-
.continueWith(connectionMono)
546-
.resumeToken(resumeToken);
547-
keepAliveHandler =
548-
new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection());
549-
wrappedConnection = session.resumableConnection();
550-
} else {
551-
resumeToken = Unpooled.EMPTY_BUFFER;
552-
keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection);
553-
wrappedConnection = connection;
554-
}
555-
556-
ClientServerInputMultiplexer multiplexer =
557-
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
558-
559-
boolean leaseEnabled = leasesSupplier != null;
560-
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
561-
RequesterLeaseHandler requesterLeaseHandler =
562-
leaseEnabled
563-
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
564-
: RequesterLeaseHandler.None;
565-
566-
RSocket rSocketRequester =
567-
new RSocketRequester(
568-
multiplexer.asClientConnection(),
569-
payloadDecoder,
570-
StreamIdSupplier.clientSupplier(),
571-
mtu,
572-
(int) keepAliveInterval.toMillis(),
573-
(int) keepAliveMaxLifeTime.toMillis(),
574-
keepAliveHandler,
575-
requesterLeaseHandler,
576-
Schedulers.single(Schedulers.parallel()));
577-
578-
RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester);
579-
580-
ByteBuf setupFrame =
581-
SetupFrameCodec.encode(
582-
wrappedConnection.alloc(),
583-
leaseEnabled,
584-
(int) keepAliveInterval.toMillis(),
585-
(int) keepAliveMaxLifeTime.toMillis(),
586-
resumeToken,
587-
metadataMimeType,
588-
dataMimeType,
589-
setupPayload);
590-
591-
SocketAcceptor acceptor =
592-
this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {});
593-
594-
ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame);
595-
596-
return interceptors
597-
.initSocketAcceptor(acceptor)
598-
.accept(setup, wrappedRSocketRequester)
517+
ct -> {
518+
int maxFrameLength = ct.maxFrameLength();
519+
520+
Mono<DuplexConnection> connectionMono =
521+
Mono.fromCallable(
522+
() -> {
523+
assertValidateSetup(maxFrameLength, maxInboundPayloadSize, mtu);
524+
return ct;
525+
})
526+
.flatMap(ClientTransport::connect)
527+
.map(
528+
connection ->
529+
mtu > 0
530+
? new FragmentationDuplexConnection(
531+
connection, mtu, maxInboundPayloadSize, "client")
532+
: new ReassemblyDuplexConnection(
533+
connection, maxInboundPayloadSize));
534+
535+
return connectionMono
599536
.flatMap(
600-
rSocketHandler -> {
601-
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);
602-
603-
ResponderLeaseHandler responderLeaseHandler =
537+
connection ->
538+
setupPayloadMono
539+
.defaultIfEmpty(EmptyPayload.INSTANCE)
540+
.map(setupPayload -> Tuples.of(connection, setupPayload))
541+
.doOnError(ex -> connection.dispose())
542+
.doOnCancel(connection::dispose))
543+
.flatMap(
544+
tuple -> {
545+
DuplexConnection connection = tuple.getT1();
546+
Payload setupPayload = tuple.getT2();
547+
ByteBuf resumeToken;
548+
KeepAliveHandler keepAliveHandler;
549+
DuplexConnection wrappedConnection;
550+
551+
if (resume != null) {
552+
resumeToken = resume.getTokenSupplier().get();
553+
ClientRSocketSession session =
554+
new ClientRSocketSession(
555+
connection,
556+
resume.getSessionDuration(),
557+
resume.getRetry(),
558+
resume.getStoreFactory(CLIENT_TAG).apply(resumeToken),
559+
resume.getStreamTimeout(),
560+
resume.isCleanupStoreOnKeepAlive())
561+
.continueWith(connectionMono)
562+
.resumeToken(resumeToken);
563+
keepAliveHandler =
564+
new KeepAliveHandler.ResumableKeepAliveHandler(
565+
session.resumableConnection());
566+
wrappedConnection = session.resumableConnection();
567+
} else {
568+
resumeToken = Unpooled.EMPTY_BUFFER;
569+
keepAliveHandler =
570+
new KeepAliveHandler.DefaultKeepAliveHandler(connection);
571+
wrappedConnection = connection;
572+
}
573+
574+
ClientServerInputMultiplexer multiplexer =
575+
new ClientServerInputMultiplexer(wrappedConnection, interceptors, true);
576+
577+
boolean leaseEnabled = leasesSupplier != null;
578+
Leases<?> leases = leaseEnabled ? leasesSupplier.get() : null;
579+
RequesterLeaseHandler requesterLeaseHandler =
604580
leaseEnabled
605-
? new ResponderLeaseHandler.Impl<>(
606-
CLIENT_TAG,
607-
wrappedConnection.alloc(),
608-
leases.sender(),
609-
leases.stats())
610-
: ResponderLeaseHandler.None;
611-
612-
RSocket rSocketResponder =
613-
new RSocketResponder(
614-
multiplexer.asServerConnection(),
615-
wrappedRSocketHandler,
581+
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
582+
: RequesterLeaseHandler.None;
583+
584+
RSocket rSocketRequester =
585+
new RSocketRequester(
586+
multiplexer.asClientConnection(),
616587
payloadDecoder,
617-
responderLeaseHandler,
618-
mtu);
619-
620-
return wrappedConnection
621-
.sendOne(setupFrame.retain())
622-
.thenReturn(wrappedRSocketRequester);
623-
})
624-
.doFinally(signalType -> setup.release());
588+
StreamIdSupplier.clientSupplier(),
589+
mtu,
590+
maxFrameLength,
591+
(int) keepAliveInterval.toMillis(),
592+
(int) keepAliveMaxLifeTime.toMillis(),
593+
keepAliveHandler,
594+
requesterLeaseHandler,
595+
Schedulers.single(Schedulers.parallel()));
596+
597+
RSocket wrappedRSocketRequester =
598+
interceptors.initRequester(rSocketRequester);
599+
600+
ByteBuf setupFrame =
601+
SetupFrameCodec.encode(
602+
wrappedConnection.alloc(),
603+
leaseEnabled,
604+
(int) keepAliveInterval.toMillis(),
605+
(int) keepAliveMaxLifeTime.toMillis(),
606+
resumeToken,
607+
metadataMimeType,
608+
dataMimeType,
609+
setupPayload);
610+
611+
SocketAcceptor acceptor =
612+
this.acceptor != null
613+
? this.acceptor
614+
: SocketAcceptor.with(new RSocket() {});
615+
616+
ConnectionSetupPayload setup =
617+
new DefaultConnectionSetupPayload(setupFrame);
618+
619+
return interceptors
620+
.initSocketAcceptor(acceptor)
621+
.accept(setup, wrappedRSocketRequester)
622+
.flatMap(
623+
rSocketHandler -> {
624+
RSocket wrappedRSocketHandler =
625+
interceptors.initResponder(rSocketHandler);
626+
627+
ResponderLeaseHandler responderLeaseHandler =
628+
leaseEnabled
629+
? new ResponderLeaseHandler.Impl<>(
630+
CLIENT_TAG,
631+
wrappedConnection.alloc(),
632+
leases.sender(),
633+
leases.stats())
634+
: ResponderLeaseHandler.None;
635+
636+
RSocket rSocketResponder =
637+
new RSocketResponder(
638+
multiplexer.asServerConnection(),
639+
wrappedRSocketHandler,
640+
payloadDecoder,
641+
responderLeaseHandler,
642+
mtu,
643+
maxFrameLength);
644+
645+
return wrappedConnection
646+
.sendOne(setupFrame.retain())
647+
.thenReturn(wrappedRSocketRequester);
648+
})
649+
.doFinally(signalType -> setup.release());
650+
});
625651
})
626652
.as(
627653
source -> {

0 commit comments

Comments
 (0)