Skip to content

Commit 6a3dd35

Browse files
committed
finalizes impl
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 6b688a0 commit 6a3dd35

33 files changed

+539
-139
lines changed

rsocket-core/src/main/java/io/rsocket/core/PayloadValidationUtils.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,23 @@ final class PayloadValidationUtils {
88
static final String INVALID_PAYLOAD_ERROR_MESSAGE =
99
"The payload is too big to send as a single frame with a 24-bit encoded length. Consider enabling fragmentation via RSocketFactory.";
1010

11-
static boolean isValid(int mtu, Payload payload) {
11+
static boolean isValid(int mtu, Payload payload, int maxFrameLength) {
1212
if (mtu > 0) {
1313
return true;
1414
}
1515

1616
if (payload.hasMetadata()) {
17-
return (((FrameHeaderCodec.size()
18-
+ FrameLengthCodec.FRAME_LENGTH_SIZE
19-
+ FrameHeaderCodec.size()
20-
+ payload.data().readableBytes()
21-
+ payload.metadata().readableBytes())
22-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
23-
== 0);
17+
return ((FrameHeaderCodec.size()
18+
+ FrameLengthCodec.FRAME_LENGTH_SIZE
19+
+ FrameHeaderCodec.size()
20+
+ payload.data().readableBytes()
21+
+ payload.metadata().readableBytes())
22+
<= maxFrameLength);
2423
} else {
25-
return (((FrameHeaderCodec.size()
26-
+ payload.data().readableBytes()
27-
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
28-
& ~FrameLengthCodec.FRAME_LENGTH_MASK)
29-
== 0);
24+
return ((FrameHeaderCodec.size()
25+
+ payload.data().readableBytes()
26+
+ FrameLengthCodec.FRAME_LENGTH_SIZE)
27+
<= maxFrameLength);
3028
}
3129
}
3230
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
19+
1820
import io.netty.buffer.ByteBuf;
1921
import io.netty.buffer.Unpooled;
2022
import io.rsocket.ConnectionSetupPayload;
@@ -92,7 +94,7 @@ public class RSocketConnector {
9294
private Supplier<Leases<?>> leasesSupplier;
9395

9496
private int mtu = 0;
95-
private int maxReassemblySize = Integer.MAX_VALUE;
97+
private int maxPayloadSize = Integer.MAX_VALUE;
9698
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
9799

98100
private RSocketConnector() {}
@@ -403,14 +405,14 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
403405
* <p>By default this is not set in which case maximum reassembled payloads size is not
404406
* controlled.
405407
*
406-
* @param maxReassemblySize the threshold size for reassembly, must be no less than 16,777,215
408+
* @param maxPayloadSize the threshold size for reassembly, must no be less than 64 bytes
407409
* @return the same instance for method chaining
408410
* @see <a
409411
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
410412
* and Reassembly</a>
411413
*/
412-
public RSocketConnector reassemble(int maxReassemblySize) {
413-
this.maxReassemblySize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
414+
public RSocketConnector maxPayloadSize(int maxPayloadSize) {
415+
this.maxPayloadSize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxPayloadSize);
414416
return this;
415417
}
416418

@@ -492,13 +494,51 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
492494

493495
Mono<DuplexConnection> connectionMono =
494496
Mono.fromSupplier(transportSupplier)
497+
.<ClientTransport>handle(
498+
(ct, sink) -> {
499+
int maxFrameLength = ct.maxFrameLength();
500+
501+
if (maxFrameLength > FRAME_LENGTH_MASK) {
502+
sink.error(
503+
new IllegalArgumentException(
504+
"Configured maxFrameLength["
505+
+ maxFrameLength
506+
+ "] exits maxFrameLength limit "
507+
+ FRAME_LENGTH_MASK));
508+
return;
509+
}
510+
511+
if (maxFrameLength > maxPayloadSize) {
512+
sink.error(
513+
new IllegalArgumentException(
514+
"Configured maxFrameLength["
515+
+ maxFrameLength
516+
+ "] exits maxPayloadSize["
517+
+ maxPayloadSize
518+
+ "]"));
519+
return;
520+
}
521+
522+
if (mtu != 0 && mtu > maxFrameLength) {
523+
sink.error(
524+
new IllegalArgumentException(
525+
"Configured maximumTransmissionUnit["
526+
+ mtu
527+
+ "] exits configured maxFrameLength["
528+
+ maxFrameLength
529+
+ "]"));
530+
return;
531+
}
532+
533+
sink.next(ct);
534+
})
495535
.flatMap(ClientTransport::connect)
496536
.map(
497537
connection ->
498538
mtu > 0
499539
? new FragmentationDuplexConnection(
500-
connection, mtu, maxReassemblySize, "client")
501-
: new ReassemblyDuplexConnection(connection, maxReassemblySize));
540+
connection, mtu, maxPayloadSize, "client")
541+
: new ReassemblyDuplexConnection(connection, maxPayloadSize));
502542
return connectionMono
503543
.flatMap(
504544
connection -> {
@@ -543,6 +583,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
543583
payloadDecoder,
544584
StreamIdSupplier.clientSupplier(),
545585
mtu,
586+
maxPayloadSize,
546587
(int) keepAliveInterval.toMillis(),
547588
(int) keepAliveMaxLifeTime.toMillis(),
548589
keepAliveHandler,
@@ -589,7 +630,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
589630
wrappedRSocketHandler,
590631
payloadDecoder,
591632
responderLeaseHandler,
592-
mtu);
633+
mtu,
634+
maxPayloadSize);
593635

594636
return wrappedConnection
595637
.sendOne(setupFrame.retain())

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ class RSocketRequester implements RSocket {
106106
private final IntObjectMap<Processor<Payload, Payload>> receivers;
107107
private final UnboundedProcessor<ByteBuf> sendProcessor;
108108
private final int mtu;
109+
private final int maxFrameLength;
109110
private final RequesterLeaseHandler leaseHandler;
110111
private final ByteBufAllocator allocator;
111112
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
@@ -117,6 +118,7 @@ class RSocketRequester implements RSocket {
117118
PayloadDecoder payloadDecoder,
118119
StreamIdSupplier streamIdSupplier,
119120
int mtu,
121+
int maxFrameLength,
120122
int keepAliveTickPeriod,
121123
int keepAliveAckTimeout,
122124
@Nullable KeepAliveHandler keepAliveHandler,
@@ -127,6 +129,7 @@ class RSocketRequester implements RSocket {
127129
this.payloadDecoder = payloadDecoder;
128130
this.streamIdSupplier = streamIdSupplier;
129131
this.mtu = mtu;
132+
this.maxFrameLength = maxFrameLength;
130133
this.leaseHandler = leaseHandler;
131134
this.senders = new SynchronizedIntObjectHashMap<>();
132135
this.receivers = new SynchronizedIntObjectHashMap<>();
@@ -208,7 +211,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
208211
return Mono.error(t);
209212
}
210213

211-
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
214+
if (!PayloadValidationUtils.isValid(this.mtu, payload, maxFrameLength)) {
212215
payload.release();
213216
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
214217
}
@@ -257,7 +260,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
257260
return Mono.error(t);
258261
}
259262

260-
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
263+
if (!PayloadValidationUtils.isValid(this.mtu, payload, maxFrameLength)) {
261264
payload.release();
262265
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
263266
}
@@ -337,7 +340,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
337340
return Flux.error(t);
338341
}
339342

340-
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
343+
if (!PayloadValidationUtils.isValid(this.mtu, payload, maxFrameLength)) {
341344
payload.release();
342345
return Flux.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
343346
}
@@ -431,7 +434,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
431434
return Mono.error(new IllegalReferenceCountException());
432435
}
433436

434-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
437+
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
435438
payload.release();
436439
final IllegalArgumentException t =
437440
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
@@ -479,7 +482,7 @@ protected void hookOnNext(Payload payload) {
479482
request(1);
480483
return;
481484
}
482-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
485+
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
483486
payload.release();
484487
cancel();
485488
final IllegalArgumentException t =
@@ -594,7 +597,7 @@ private Mono<Void> handleMetadataPush(Payload payload) {
594597
return Mono.error(err);
595598
}
596599

597-
if (!PayloadValidationUtils.isValid(this.mtu, payload)) {
600+
if (!PayloadValidationUtils.isValid(this.mtu, payload, maxFrameLength)) {
598601
payload.release();
599602
return Mono.error(new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
600603
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class RSocketResponder implements RSocket {
8181
RSocketResponder.class, Throwable.class, "terminationError");
8282

8383
private final int mtu;
84+
private final int maxFrameLength;
8485

8586
private final IntObjectMap<Subscription> sendingSubscriptions;
8687
private final IntObjectMap<Processor<Payload, Payload>> channelProcessors;
@@ -93,10 +94,12 @@ class RSocketResponder implements RSocket {
9394
RSocket requestHandler,
9495
PayloadDecoder payloadDecoder,
9596
ResponderLeaseHandler leaseHandler,
96-
int mtu) {
97+
int mtu,
98+
int maxFrameLength) {
9799
this.connection = connection;
98100
this.allocator = connection.alloc();
99101
this.mtu = mtu;
102+
this.maxFrameLength = maxFrameLength;
100103

101104
this.requestHandler = requestHandler;
102105
this.responderRSocket =
@@ -408,7 +411,7 @@ protected void hookOnNext(Payload payload) {
408411
isEmpty = false;
409412
}
410413

411-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
414+
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
412415
payload.release();
413416
cancel();
414417
final IllegalArgumentException t =
@@ -459,7 +462,7 @@ protected void hookOnSubscribe(Subscription s) {
459462
@Override
460463
protected void hookOnNext(Payload payload) {
461464
try {
462-
if (!PayloadValidationUtils.isValid(mtu, payload)) {
465+
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
463466
payload.release();
464467
final IllegalArgumentException t =
465468
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.rsocket.core;
1818

19+
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
20+
1921
import io.netty.buffer.ByteBuf;
2022
import io.rsocket.Closeable;
2123
import io.rsocket.ConnectionSetupPayload;
@@ -66,7 +68,7 @@ public final class RSocketServer {
6668
private Supplier<Leases<?>> leasesSupplier = null;
6769

6870
private int mtu = 0;
69-
private int maxReassemblySize = Integer.MAX_VALUE;
71+
private int maxPayloadSize = Integer.MAX_VALUE;
7072
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
7173

7274
private RSocketServer() {}
@@ -205,14 +207,14 @@ public RSocketServer lease(Supplier<Leases<?>> supplier) {
205207
* <p>By default this is not set in which case maximum reassembled payloads size is not
206208
* controlled.
207209
*
208-
* @param maxReassemblySize the threshold size for reassembly, must be no less than 16,777,215
210+
* @param maxReassemblySize the threshold size for reassembly, must no be less than 64 bytes
209211
* @return the same instance for method chaining
210212
* @see <a
211213
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
212214
* and Reassembly</a>
213215
*/
214-
public RSocketServer reassemble(int maxReassemblySize) {
215-
this.maxReassemblySize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
216+
public RSocketServer maxPayloadSize(int maxReassemblySize) {
217+
this.maxPayloadSize = ReassemblyDuplexConnection.assertMaxReassemblySize(maxReassemblySize);
216218
return this;
217219
}
218220

@@ -280,10 +282,41 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
280282
public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
281283
return Mono.defer(
282284
new Supplier<Mono<T>>() {
283-
ServerSetup serverSetup = serverSetup();
285+
final ServerSetup serverSetup = serverSetup();
284286

285287
@Override
286288
public Mono<T> get() {
289+
int maxFrameLength = transport.maxFrameLength();
290+
291+
if (maxFrameLength > FRAME_LENGTH_MASK) {
292+
return Mono.error(
293+
new IllegalArgumentException(
294+
"Configured maxFrameLength["
295+
+ maxFrameLength
296+
+ "] exits maxFrameLength limit "
297+
+ FRAME_LENGTH_MASK));
298+
}
299+
300+
if (maxFrameLength > maxPayloadSize) {
301+
return Mono.error(
302+
new IllegalArgumentException(
303+
"Configured maxFrameLength["
304+
+ maxFrameLength
305+
+ "] exits maxPayloadSize["
306+
+ maxPayloadSize
307+
+ "]"));
308+
}
309+
310+
if (mtu != 0 && mtu > maxFrameLength) {
311+
return Mono.error(
312+
new IllegalArgumentException(
313+
"Configured maximumTransmissionUnit["
314+
+ mtu
315+
+ "] exits configured maxFrameLength["
316+
+ maxFrameLength
317+
+ "]"));
318+
}
319+
287320
return transport
288321
.start(duplexConnection -> acceptor(serverSetup, duplexConnection))
289322
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
@@ -319,8 +352,8 @@ public Mono<Void> apply(DuplexConnection connection) {
319352
private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
320353
connection =
321354
mtu > 0
322-
? new FragmentationDuplexConnection(connection, mtu, maxReassemblySize, "server")
323-
: new ReassemblyDuplexConnection(connection, maxReassemblySize);
355+
? new FragmentationDuplexConnection(connection, mtu, maxPayloadSize, "server")
356+
: new ReassemblyDuplexConnection(connection, maxPayloadSize);
324357

325358
ClientServerInputMultiplexer multiplexer =
326359
new ClientServerInputMultiplexer(connection, interceptors, false);
@@ -403,6 +436,7 @@ private Mono<Void> acceptSetup(
403436
payloadDecoder,
404437
StreamIdSupplier.serverSupplier(),
405438
mtu,
439+
maxPayloadSize,
406440
setupPayload.keepAliveInterval(),
407441
setupPayload.keepAliveMaxLifetime(),
408442
keepAliveHandler,
@@ -436,7 +470,8 @@ private Mono<Void> acceptSetup(
436470
wrappedRSocketHandler,
437471
payloadDecoder,
438472
responderLeaseHandler,
439-
mtu);
473+
mtu,
474+
maxPayloadSize);
440475
})
441476
.doFinally(signalType -> setupPayload.release())
442477
.then();

0 commit comments

Comments
 (0)