Skip to content

Commit 351ab63

Browse files
committed
optimizes FragmentationDuplexConnection by extending ReassemblyDuplexConnection
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 28ba607 commit 351ab63

File tree

9 files changed

+39
-53
lines changed

9 files changed

+39
-53
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@
3434
import reactor.core.publisher.Mono;
3535

3636
/**
37-
* A {@link DuplexConnection} implementation that fragments {@link ByteBuf}s.
37+
* A {@link DuplexConnection} implementation that fragments and reassembly {@link ByteBuf}s.
3838
*
3939
* @see <a
4040
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
4141
* and Reassembly</a>
4242
*/
43-
public final class FragmentationDuplexConnection implements DuplexConnection {
43+
public final class FragmentationDuplexConnection extends ReassemblyDuplexConnection
44+
implements DuplexConnection {
4445
private static final int MIN_MTU_SIZE = 64;
4546
private static final Logger logger = LoggerFactory.getLogger(FragmentationDuplexConnection.class);
4647
private final DuplexConnection delegate;
@@ -54,11 +55,13 @@ public FragmentationDuplexConnection(
5455
DuplexConnection delegate,
5556
ByteBufAllocator allocator,
5657
int mtu,
57-
boolean encodeLength,
58+
boolean encodeAndEncodeLength,
5859
String type) {
60+
super(delegate, allocator, encodeAndEncodeLength);
61+
5962
Objects.requireNonNull(delegate, "delegate must not be null");
6063
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
61-
this.encodeLength = encodeLength;
64+
this.encodeLength = encodeAndEncodeLength;
6265
this.allocator = allocator;
6366
this.delegate = delegate;
6467
this.mtu = assertMtu(mtu);
@@ -137,19 +140,4 @@ private ByteBuf encode(ByteBuf frame) {
137140
return frame;
138141
}
139142
}
140-
141-
@Override
142-
public Flux<ByteBuf> receive() {
143-
return delegate.receive();
144-
}
145-
146-
@Override
147-
public Mono<Void> onClose() {
148-
return delegate.onClose();
149-
}
150-
151-
@Override
152-
public void dispose() {
153-
delegate.dispose();
154-
}
155143
}

rsocket-core/src/main/java/io/rsocket/fragmentation/ReassemblyDuplexConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
3333
* and Reassembly</a>
3434
*/
35-
public final class ReassemblyDuplexConnection implements DuplexConnection {
35+
public class ReassemblyDuplexConnection implements DuplexConnection {
3636
private final DuplexConnection delegate;
3737
private final FrameReassembler frameReassembler;
3838
private final boolean decodeLength;

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -66,32 +66,26 @@ private Mono<DuplexConnection> connect() {
6666
UnboundedProcessor<ByteBuf> out = new UnboundedProcessor<>();
6767
MonoProcessor<Void> closeNotifier = MonoProcessor.create();
6868

69-
server.accept(
70-
new ReassemblyDuplexConnection(
71-
new LocalDuplexConnection(out, in, closeNotifier),
72-
ByteBufAllocator.DEFAULT,
73-
false));
69+
server.accept(new LocalDuplexConnection(out, in, closeNotifier));
7470

75-
return Mono.just(
76-
(DuplexConnection)
77-
new ReassemblyDuplexConnection(
78-
new LocalDuplexConnection(in, out, closeNotifier),
79-
ByteBufAllocator.DEFAULT,
80-
false));
71+
return Mono.just((DuplexConnection) new LocalDuplexConnection(in, out, closeNotifier));
8172
});
8273
}
8374

8475
@Override
8576
public Mono<DuplexConnection> connect(int mtu) {
8677
Mono<DuplexConnection> isError = FragmentationDuplexConnection.checkMtu(mtu);
8778
Mono<DuplexConnection> connect = isError != null ? isError : connect();
88-
if (mtu > 0) {
89-
return connect.map(
90-
duplexConnection ->
91-
new FragmentationDuplexConnection(
92-
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "client"));
93-
} else {
94-
return connect;
95-
}
79+
80+
return connect.map(
81+
duplexConnection -> {
82+
if (mtu > 0) {
83+
return new FragmentationDuplexConnection(
84+
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "client");
85+
} else {
86+
return new ReassemblyDuplexConnection(
87+
duplexConnection, ByteBufAllocator.DEFAULT, false);
88+
}
89+
});
9690
}
9791
}

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.rsocket.Closeable;
2121
import io.rsocket.DuplexConnection;
2222
import io.rsocket.fragmentation.FragmentationDuplexConnection;
23+
import io.rsocket.fragmentation.ReassemblyDuplexConnection;
2324
import io.rsocket.transport.ClientTransport;
2425
import io.rsocket.transport.ServerTransport;
2526
import java.util.Objects;
@@ -168,6 +169,9 @@ public void accept(DuplexConnection duplexConnection) {
168169
duplexConnection =
169170
new FragmentationDuplexConnection(
170171
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "server");
172+
} else {
173+
duplexConnection =
174+
new ReassemblyDuplexConnection(duplexConnection, ByteBufAllocator.DEFAULT, false);
171175
}
172176

173177
acceptor.apply(duplexConnection).subscribe();

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ public Mono<DuplexConnection> connect(int mtu) {
105105
c -> {
106106
if (mtu > 0) {
107107
return new FragmentationDuplexConnection(
108-
new ReassemblyDuplexConnection(
109-
new TcpDuplexConnection(c, false), ByteBufAllocator.DEFAULT, true),
108+
new TcpDuplexConnection(c, false),
110109
ByteBufAllocator.DEFAULT,
111110
mtu,
112111
true,

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,14 @@ public Mono<DuplexConnection> connect(int mtu) {
161161
.connect()
162162
.map(
163163
c -> {
164-
DuplexConnection connection =
165-
new ReassemblyDuplexConnection(
166-
new WebsocketDuplexConnection(c), ByteBufAllocator.DEFAULT, false);
164+
DuplexConnection connection = new WebsocketDuplexConnection(c);
167165
if (mtu > 0) {
168166
connection =
169167
new FragmentationDuplexConnection(
170168
connection, ByteBufAllocator.DEFAULT, mtu, false, "client");
169+
} else {
170+
connection =
171+
new ReassemblyDuplexConnection(connection, ByteBufAllocator.DEFAULT, false);
171172
}
172173
return connection;
173174
});

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
106106
if (mtu > 0) {
107107
connection =
108108
new FragmentationDuplexConnection(
109-
new ReassemblyDuplexConnection(
110-
new TcpDuplexConnection(c, false), ByteBufAllocator.DEFAULT, true),
109+
new TcpDuplexConnection(c, false),
111110
ByteBufAllocator.DEFAULT,
112111
mtu,
113112
true,

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,13 @@ public static BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> n
103103
public static BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> newHandler(
104104
ConnectionAcceptor acceptor, int mtu) {
105105
return (in, out) -> {
106-
DuplexConnection connection =
107-
new ReassemblyDuplexConnection(
108-
new WebsocketDuplexConnection((Connection) in), ByteBufAllocator.DEFAULT, false);
106+
DuplexConnection connection = new WebsocketDuplexConnection((Connection) in);
109107
if (mtu > 0) {
110108
connection =
111109
new FragmentationDuplexConnection(
112110
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
111+
} else {
112+
connection = new ReassemblyDuplexConnection(connection, ByteBufAllocator.DEFAULT, false);
113113
}
114114
return acceptor.apply(connection).then(out.neverComplete());
115115
};

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,15 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
126126
return response.sendWebsocket(
127127
(in, out) -> {
128128
DuplexConnection connection =
129-
new ReassemblyDuplexConnection(
130-
new WebsocketDuplexConnection((Connection) in),
131-
ByteBufAllocator.DEFAULT,
132-
false);
129+
new WebsocketDuplexConnection((Connection) in);
133130
if (mtu > 0) {
134131
connection =
135132
new FragmentationDuplexConnection(
136133
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
134+
} else {
135+
connection =
136+
new ReassemblyDuplexConnection(
137+
connection, ByteBufAllocator.DEFAULT, false);
137138
}
138139
return acceptor.apply(connection).then(out.neverComplete());
139140
},

0 commit comments

Comments
 (0)