Skip to content

Commit d8da87a

Browse files
authored
Apply FragmentationDuplexConnection from a single place (#836)
1 parent 585b5ef commit d8da87a

32 files changed

+197
-486
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.RSocket;
2424
import io.rsocket.SocketAcceptor;
2525
import io.rsocket.fragmentation.FragmentationDuplexConnection;
26+
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
2627
import io.rsocket.frame.SetupFrameCodec;
2728
import io.rsocket.frame.decoder.PayloadDecoder;
2829
import io.rsocket.internal.ClientServerInputMultiplexer;
@@ -400,14 +401,7 @@ public RSocketConnector lease(Supplier<Leases<? extends LeaseStats>> supplier) {
400401
* and Reassembly</a>
401402
*/
402403
public RSocketConnector fragment(int mtu) {
403-
if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) {
404-
String msg =
405-
String.format(
406-
"The smallest allowed mtu size is %d bytes, provided: %d",
407-
FragmentationDuplexConnection.MIN_MTU_SIZE, mtu);
408-
throw new IllegalArgumentException(msg);
409-
}
410-
this.mtu = mtu;
404+
this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
411405
return this;
412406
}
413407

@@ -468,8 +462,15 @@ public Mono<RSocket> connect(ClientTransport transport) {
468462
* @return a {@code Mono} with the connected RSocket
469463
*/
470464
public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
465+
471466
Mono<DuplexConnection> connectionMono =
472-
Mono.fromSupplier(transportSupplier).flatMap(t -> t.connect(mtu));
467+
Mono.fromSupplier(transportSupplier)
468+
.flatMap(ClientTransport::connect)
469+
.map(
470+
connection ->
471+
mtu > 0
472+
? new FragmentationDuplexConnection(connection, mtu, "client")
473+
: new ReassemblyDuplexConnection(connection));
473474
return connectionMono
474475
.flatMap(
475476
connection -> {

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.rsocket.exceptions.InvalidSetupException;
2727
import io.rsocket.exceptions.RejectedSetupException;
2828
import io.rsocket.fragmentation.FragmentationDuplexConnection;
29+
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
2930
import io.rsocket.frame.FrameHeaderCodec;
3031
import io.rsocket.frame.SetupFrameCodec;
3132
import io.rsocket.frame.decoder.PayloadDecoder;
@@ -211,14 +212,7 @@ public RSocketServer lease(Supplier<Leases<?>> supplier) {
211212
* and Reassembly</a>
212213
*/
213214
public RSocketServer fragment(int mtu) {
214-
if (mtu > 0 && mtu < FragmentationDuplexConnection.MIN_MTU_SIZE || mtu < 0) {
215-
String msg =
216-
String.format(
217-
"The smallest allowed mtu size is %d bytes, provided: %d",
218-
FragmentationDuplexConnection.MIN_MTU_SIZE, mtu);
219-
throw new IllegalArgumentException(msg);
220-
}
221-
this.mtu = mtu;
215+
this.mtu = FragmentationDuplexConnection.assertMtu(mtu);
222216
return this;
223217
}
224218

@@ -273,7 +267,7 @@ public <T extends Closeable> Mono<T> bind(ServerTransport<T> transport) {
273267
@Override
274268
public Mono<T> get() {
275269
return transport
276-
.start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu)
270+
.start(duplexConnection -> acceptor(serverSetup, duplexConnection))
277271
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
278272
}
279273
});
@@ -305,6 +299,11 @@ public Mono<Void> apply(DuplexConnection connection) {
305299
}
306300

307301
private Mono<Void> acceptor(ServerSetup serverSetup, DuplexConnection connection) {
302+
connection =
303+
mtu > 0
304+
? new FragmentationDuplexConnection(connection, mtu, "server")
305+
: new ReassemblyDuplexConnection(connection);
306+
308307
ClientServerInputMultiplexer multiplexer =
309308
new ClientServerInputMultiplexer(connection, interceptors, false);
310309

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,13 @@
2222
import io.netty.buffer.ByteBufUtil;
2323
import io.rsocket.DuplexConnection;
2424
import io.rsocket.frame.FrameHeaderCodec;
25-
import io.rsocket.frame.FrameLengthCodec;
2625
import io.rsocket.frame.FrameType;
2726
import java.util.Objects;
2827
import org.reactivestreams.Publisher;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130
import reactor.core.publisher.Flux;
3231
import reactor.core.publisher.Mono;
33-
import reactor.util.annotation.Nullable;
3432

3533
/**
3634
* A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s.
@@ -46,15 +44,19 @@ public final class FragmentationDuplexConnection extends ReassemblyDuplexConnect
4644
private final DuplexConnection delegate;
4745
private final int mtu;
4846
private final FrameReassembler frameReassembler;
49-
private final boolean encodeLength;
5047
private final String type;
5148

52-
public FragmentationDuplexConnection(
53-
DuplexConnection delegate, int mtu, boolean encodeAndEncodeLength, String type) {
54-
super(delegate, encodeAndEncodeLength);
49+
/**
50+
* Class constructor.
51+
*
52+
* @param delegate the underlying connection
53+
* @param mtu the fragment size, greater than {@link #MIN_MTU_SIZE}
54+
* @param type a label to use for logging purposes
55+
*/
56+
public FragmentationDuplexConnection(DuplexConnection delegate, int mtu, String type) {
57+
super(delegate);
5558

5659
Objects.requireNonNull(delegate, "delegate must not be null");
57-
this.encodeLength = encodeAndEncodeLength;
5860
this.delegate = delegate;
5961
this.mtu = assertMtu(mtu);
6062
this.frameReassembler = new FrameReassembler(delegate.alloc());
@@ -67,32 +69,17 @@ private boolean shouldFragment(FrameType frameType, int readableBytes) {
6769
return frameType.isFragmentable() && readableBytes > mtu;
6870
}
6971

70-
/*TODO this is nullable and not returning empty to workaround javac 11.0.3 compiler issue on ubuntu (at least) */
71-
@Nullable
72-
public static <T> Mono<T> checkMtu(int mtu) {
73-
if (isInsufficientMtu(mtu)) {
72+
public static int assertMtu(int mtu) {
73+
if (mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0) {
7474
String msg =
75-
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
76-
return Mono.error(new IllegalArgumentException(msg));
77-
} else {
78-
return null;
79-
}
80-
}
81-
82-
private static int assertMtu(int mtu) {
83-
if (isInsufficientMtu(mtu)) {
84-
String msg =
85-
String.format("smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
75+
String.format(
76+
"The smallest allowed mtu size is %d bytes, provided: %d", MIN_MTU_SIZE, mtu);
8677
throw new IllegalArgumentException(msg);
8778
} else {
8879
return mtu;
8980
}
9081
}
9182

92-
private static boolean isInsufficientMtu(int mtu) {
93-
return mtu > 0 && mtu < MIN_MTU_SIZE || mtu < 0;
94-
}
95-
9683
@Override
9784
public Mono<Void> send(Publisher<ByteBuf> frames) {
9885
return Flux.from(frames).concatMap(this::sendOne).then();
@@ -102,34 +89,22 @@ public Mono<Void> send(Publisher<ByteBuf> frames) {
10289
public Mono<Void> sendOne(ByteBuf frame) {
10390
FrameType frameType = FrameHeaderCodec.frameType(frame);
10491
int readableBytes = frame.readableBytes();
105-
if (shouldFragment(frameType, readableBytes)) {
106-
if (logger.isDebugEnabled()) {
107-
return delegate.send(
108-
Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength))
109-
.doOnNext(
110-
byteBuf -> {
111-
ByteBuf f = encodeLength ? FrameLengthCodec.frame(byteBuf) : byteBuf;
112-
logger.debug(
113-
"{} - stream id {} - frame type {} - \n {}",
114-
type,
115-
FrameHeaderCodec.streamId(f),
116-
FrameHeaderCodec.frameType(f),
117-
ByteBufUtil.prettyHexDump(f));
118-
}));
119-
} else {
120-
return delegate.send(
121-
Flux.from(fragmentFrame(alloc(), mtu, frame, frameType, encodeLength)));
122-
}
123-
} else {
124-
return delegate.sendOne(encode(frame));
92+
if (!shouldFragment(frameType, readableBytes)) {
93+
return delegate.sendOne(frame);
12594
}
126-
}
127-
128-
private ByteBuf encode(ByteBuf frame) {
129-
if (encodeLength) {
130-
return FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame);
131-
} else {
132-
return frame;
95+
Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
96+
if (logger.isDebugEnabled()) {
97+
fragments =
98+
fragments.doOnNext(
99+
byteBuf -> {
100+
logger.debug(
101+
"{} - stream id {} - frame type {} - \n {}",
102+
type,
103+
FrameHeaderCodec.streamId(byteBuf),
104+
FrameHeaderCodec.frameType(byteBuf),
105+
ByteBufUtil.prettyHexDump(byteBuf));
106+
});
133107
}
108+
return delegate.send(fragments);
134109
}
135110
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameFragmenter.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.buffer.Unpooled;
2222
import io.netty.util.ReferenceCountUtil;
2323
import io.rsocket.frame.FrameHeaderCodec;
24-
import io.rsocket.frame.FrameLengthCodec;
2524
import io.rsocket.frame.FrameType;
2625
import io.rsocket.frame.PayloadFrameCodec;
2726
import io.rsocket.frame.RequestChannelFrameCodec;
@@ -42,11 +41,7 @@
4241
*/
4342
final class FrameFragmenter {
4443
static Publisher<ByteBuf> fragmentFrame(
45-
ByteBufAllocator allocator,
46-
int mtu,
47-
final ByteBuf frame,
48-
FrameType frameType,
49-
boolean encodeLength) {
44+
ByteBufAllocator allocator, int mtu, final ByteBuf frame, FrameType frameType) {
5045
ByteBuf metadata = getMetadata(frame, frameType);
5146
ByteBuf data = getData(frame, frameType);
5247
int streamId = FrameHeaderCodec.streamId(frame);
@@ -66,7 +61,7 @@ public void accept(SynchronousSink<ByteBuf> sink) {
6661
byteBuf = encodeFollowsFragment(allocator, mtu, streamId, metadata, data);
6762
}
6863

69-
sink.next(encode(allocator, byteBuf, encodeLength));
64+
sink.next(byteBuf);
7065
if (!metadata.isReadable() && !data.isReadable()) {
7166
sink.complete();
7267
}
@@ -237,12 +232,4 @@ static ByteBuf getData(ByteBuf frame, FrameType frameType) {
237232
}
238233
return data;
239234
}
240-
241-
static ByteBuf encode(ByteBufAllocator allocator, ByteBuf frame, boolean encodeLength) {
242-
if (encodeLength) {
243-
return FrameLengthCodec.encode(allocator, frame.readableBytes(), frame);
244-
} else {
245-
return frame;
246-
}
247-
}
248235
}

rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.DuplexConnection;
22-
import io.rsocket.frame.FrameLengthCodec;
2322
import java.util.Objects;
2423
import org.reactivestreams.Publisher;
2524
import reactor.core.publisher.Flux;
@@ -35,11 +34,10 @@
3534
public class ReassemblyDuplexConnection implements DuplexConnection {
3635
private final DuplexConnection delegate;
3736
private final FrameReassembler frameReassembler;
38-
private final boolean decodeLength;
3937

40-
public ReassemblyDuplexConnection(DuplexConnection delegate, boolean decodeLength) {
38+
/** Constructor with the underlying delegate to receive frames from. */
39+
public ReassemblyDuplexConnection(DuplexConnection delegate) {
4140
Objects.requireNonNull(delegate, "delegate must not be null");
42-
this.decodeLength = decodeLength;
4341
this.delegate = delegate;
4442
this.frameReassembler = new FrameReassembler(delegate.alloc());
4543

@@ -56,23 +54,9 @@ public Mono<Void> sendOne(ByteBuf frame) {
5654
return delegate.sendOne(frame);
5755
}
5856

59-
private ByteBuf decode(ByteBuf frame) {
60-
if (decodeLength) {
61-
return FrameLengthCodec.frame(frame).retain();
62-
} else {
63-
return frame;
64-
}
65-
}
66-
6757
@Override
6858
public Flux<ByteBuf> receive() {
69-
return delegate
70-
.receive()
71-
.handle(
72-
(byteBuf, sink) -> {
73-
ByteBuf decode = decode(byteBuf);
74-
frameReassembler.reassembleFrame(decode, sink);
75-
});
59+
return delegate.receive().handle(frameReassembler::reassembleFrame);
7660
}
7761

7862
@Override

rsocket-core/src/main/java/io/rsocket/transport/ClientTransport.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,11 +23,9 @@
2323
public interface ClientTransport extends Transport {
2424

2525
/**
26-
* Returns a {@code Publisher}, every subscription to which returns a single {@code
27-
* DuplexConnection}.
26+
* Return a {@code Mono} that connects for each subscriber.
2827
*
29-
* @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled
30-
* @return {@code Publisher}, every subscription returns a single {@code DuplexConnection}.
28+
* @since 1.0.1
3129
*/
32-
Mono<DuplexConnection> connect(int mtu);
30+
Mono<DuplexConnection> connect();
3331
}

rsocket-core/src/main/java/io/rsocket/transport/ServerTransport.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,14 +26,13 @@
2626
public interface ServerTransport<T extends Closeable> extends Transport {
2727

2828
/**
29-
* Starts this server.
29+
* Start this server.
3030
*
31-
* @param acceptor An acceptor to process a newly accepted {@code DuplexConnection}
32-
* @param mtu The mtu used for fragmentation - if set to zero fragmentation will be disabled
33-
* @return A handle to retrieve information about a started server.
34-
* @throws NullPointerException if {@code acceptor} is {@code null}
31+
* @param acceptor to process a newly accepted connections with
32+
* @return A handle for information about and control over the server.
33+
* @since 1.0.1
3534
*/
36-
Mono<T> start(ConnectionAcceptor acceptor, int mtu);
35+
Mono<T> start(ConnectionAcceptor acceptor);
3736

3837
/** A contract to accept a new {@code DuplexConnection}. */
3938
interface ConnectionAcceptor extends Function<DuplexConnection, Publisher<Void>> {

rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public void shouldBeASharedReconnectableInstanceOfRSocketMono() {
6464
@SuppressWarnings({"rawtype", "unchecked"})
6565
public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() {
6666
ClientTransport transport = Mockito.mock(ClientTransport.class);
67-
Mockito.when(transport.connect(0))
67+
Mockito.when(transport.connect())
6868
.thenThrow(UncheckedIOException.class)
6969
.thenThrow(UncheckedIOException.class)
7070
.thenThrow(UncheckedIOException.class)
7171
.thenThrow(UncheckedIOException.class)
72-
.thenReturn(new TestClientTransport().connect(0));
72+
.thenReturn(new TestClientTransport().connect());
7373
Mono<RSocket> rSocketMono =
7474
RSocketConnector.create()
7575
.reconnect(
@@ -93,13 +93,13 @@ public void shouldBeRetrieableConnectionSharedReconnectableInstanceOfRSocketMono
9393
@SuppressWarnings({"rawtype", "unchecked"})
9494
public void shouldBeExaustedRetrieableConnectionSharedReconnectableInstanceOfRSocketMono() {
9595
ClientTransport transport = Mockito.mock(ClientTransport.class);
96-
Mockito.when(transport.connect(0))
96+
Mockito.when(transport.connect())
9797
.thenThrow(UncheckedIOException.class)
9898
.thenThrow(UncheckedIOException.class)
9999
.thenThrow(UncheckedIOException.class)
100100
.thenThrow(UncheckedIOException.class)
101101
.thenThrow(UncheckedIOException.class)
102-
.thenReturn(new TestClientTransport().connect(0));
102+
.thenReturn(new TestClientTransport().connect());
103103
Mono<RSocket> rSocketMono =
104104
RSocketConnector.create()
105105
.reconnect(

rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private static class SingleConnectionTransport implements ServerTransport<TestCl
137137
private final TestDuplexConnection conn = new TestDuplexConnection(allocator);
138138

139139
@Override
140-
public Mono<TestCloseable> start(ConnectionAcceptor acceptor, int mtu) {
140+
public Mono<TestCloseable> start(ConnectionAcceptor acceptor) {
141141
return Mono.just(new TestCloseable(acceptor, conn));
142142
}
143143

0 commit comments

Comments
 (0)