Skip to content

Commit de7ee88

Browse files
authored
fixes issue with fragmentation when the MTU size is set to 64 bytes (#616)
Signed-off-by: Robert Roeser <[email protected]>
1 parent 7ca471f commit de7ee88

File tree

13 files changed

+83
-42
lines changed

13 files changed

+83
-42
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.buffer.ByteBufUtil;
2324
import io.rsocket.DuplexConnection;
2425
import io.rsocket.frame.FrameHeaderFlyweight;
2526
import io.rsocket.frame.FrameLengthFlyweight;
@@ -46,9 +47,14 @@ public final class FragmentationDuplexConnection implements DuplexConnection {
4647
private final ByteBufAllocator allocator;
4748
private final FrameReassembler frameReassembler;
4849
private final boolean encodeLength;
50+
private final String type;
4951

5052
public FragmentationDuplexConnection(
51-
DuplexConnection delegate, ByteBufAllocator allocator, int mtu, boolean encodeLength) {
53+
DuplexConnection delegate,
54+
ByteBufAllocator allocator,
55+
int mtu,
56+
boolean encodeLength,
57+
String type) {
5258
Objects.requireNonNull(delegate, "delegate must not be null");
5359
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
5460
if (mtu < MIN_MTU_SIZE) {
@@ -59,6 +65,7 @@ public FragmentationDuplexConnection(
5965
this.delegate = delegate;
6066
this.mtu = mtu;
6167
this.frameReassembler = new FrameReassembler(allocator);
68+
this.type = type;
6269

6370
delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe();
6471
}
@@ -77,7 +84,23 @@ public Mono<Void> sendOne(ByteBuf frame) {
7784
FrameType frameType = FrameHeaderFlyweight.frameType(frame);
7885
int readableBytes = frame.readableBytes();
7986
if (shouldFragment(frameType, readableBytes)) {
80-
return delegate.send(fragmentFrame(allocator, mtu, frame, frameType, encodeLength));
87+
if (logger.isDebugEnabled()) {
88+
return delegate.send(
89+
Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength))
90+
.doOnNext(
91+
byteBuf -> {
92+
ByteBuf frame1 = FrameLengthFlyweight.frame(byteBuf);
93+
logger.debug(
94+
"{} - stream id {} - frame type {} - \n {}",
95+
type,
96+
FrameHeaderFlyweight.streamId(frame1),
97+
FrameHeaderFlyweight.frameType(frame1),
98+
ByteBufUtil.prettyHexDump(frame1));
99+
}));
100+
} else {
101+
return delegate.send(
102+
Flux.from(fragmentFrame(allocator, mtu, frame, frameType, encodeLength)));
103+
}
81104
} else {
82105
return delegate.sendOne(encode(frame));
83106
}

rsocket-core/src/main/java/io/rsocket/fragmentation/FrameReassembler.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,23 +172,23 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
172172
CompositeByteBuf metadata = getMetadata(streamId);
173173
switch (frameType) {
174174
case REQUEST_FNF:
175-
metadata.addComponents(true, RequestFireAndForgetFrameFlyweight.metadata(frame));
175+
metadata.addComponents(true, RequestFireAndForgetFrameFlyweight.metadata(frame).retain());
176176
break;
177177
case REQUEST_STREAM:
178-
metadata.addComponents(true, RequestStreamFrameFlyweight.metadata(frame));
178+
metadata.addComponents(true, RequestStreamFrameFlyweight.metadata(frame).retain());
179179
break;
180180
case REQUEST_RESPONSE:
181-
metadata.addComponents(true, RequestResponseFrameFlyweight.metadata(frame));
181+
metadata.addComponents(true, RequestResponseFrameFlyweight.metadata(frame).retain());
182182
break;
183183
case REQUEST_CHANNEL:
184-
metadata.addComponents(true, RequestChannelFrameFlyweight.metadata(frame));
184+
metadata.addComponents(true, RequestChannelFrameFlyweight.metadata(frame).retain());
185185
break;
186186
// Payload and synthetic types
187187
case PAYLOAD:
188188
case NEXT:
189189
case NEXT_COMPLETE:
190190
case COMPLETE:
191-
metadata.addComponents(true, PayloadFrameFlyweight.metadata(frame));
191+
metadata.addComponents(true, PayloadFrameFlyweight.metadata(frame).retain());
192192
break;
193193
default:
194194
throw new IllegalStateException("unsupported fragment type");
@@ -198,23 +198,23 @@ void handleFollowsFlag(ByteBuf frame, int streamId, FrameType frameType) {
198198
ByteBuf data;
199199
switch (frameType) {
200200
case REQUEST_FNF:
201-
data = RequestFireAndForgetFrameFlyweight.data(frame);
201+
data = RequestFireAndForgetFrameFlyweight.data(frame).retain();
202202
break;
203203
case REQUEST_STREAM:
204-
data = RequestStreamFrameFlyweight.data(frame);
204+
data = RequestStreamFrameFlyweight.data(frame).retain();
205205
break;
206206
case REQUEST_RESPONSE:
207-
data = RequestResponseFrameFlyweight.data(frame);
207+
data = RequestResponseFrameFlyweight.data(frame).retain();
208208
break;
209209
case REQUEST_CHANNEL:
210-
data = RequestChannelFrameFlyweight.data(frame);
210+
data = RequestChannelFrameFlyweight.data(frame).retain();
211211
break;
212212
// Payload and synthetic types
213213
case PAYLOAD:
214214
case NEXT:
215215
case NEXT_COMPLETE:
216216
case COMPLETE:
217-
data = PayloadFrameFlyweight.data(frame);
217+
data = PayloadFrameFlyweight.data(frame).retain();
218218
break;
219219
default:
220220
throw new IllegalStateException("unsupported fragment type");
@@ -243,10 +243,10 @@ void reassembleFrame(ByteBuf frame, SynchronousSink<ByteBuf> sink) {
243243

244244
boolean hasFollows = FrameHeaderFlyweight.hasFollows(frame);
245245

246-
if (!hasFollows) {
247-
handleNoFollowsFlag(frame, sink, streamId);
248-
} else {
246+
if (hasFollows) {
249247
handleFollowsFlag(frame, streamId, frameType);
248+
} else {
249+
handleNoFollowsFlag(frame, sink, streamId);
250250
}
251251

252252
} catch (Throwable t) {

rsocket-core/src/main/java/io/rsocket/frame/decoder/DefaultPayloadDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
class DefaultPayloadDecoder implements PayloadDecoder {
1212

1313
@Override
14-
public Payload apply(ByteBuf byteBuf) {
14+
public synchronized Payload apply(ByteBuf byteBuf) {
1515
ByteBuf m;
1616
ByteBuf d;
1717
FrameType type = FrameHeaderFlyweight.frameType(byteBuf);

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,31 +61,33 @@ final class FragmentationDuplexConnectionTest {
6161
void constructorInvalidMaxFragmentSize() {
6262
assertThatIllegalArgumentException()
6363
.isThrownBy(
64-
() -> new FragmentationDuplexConnection(delegate, allocator, Integer.MIN_VALUE, false))
64+
() ->
65+
new FragmentationDuplexConnection(
66+
delegate, allocator, Integer.MIN_VALUE, false, ""))
6567
.withMessage("smallest allowed mtu size is 64 bytes");
6668
}
6769

6870
@DisplayName("constructor throws IllegalArgumentException with negative maxFragmentLength")
6971
@Test
7072
void constructorMtuLessThanMin() {
7173
assertThatIllegalArgumentException()
72-
.isThrownBy(() -> new FragmentationDuplexConnection(delegate, allocator, 2, false))
74+
.isThrownBy(() -> new FragmentationDuplexConnection(delegate, allocator, 2, false, ""))
7375
.withMessage("smallest allowed mtu size is 64 bytes");
7476
}
7577

7678
@DisplayName("constructor throws NullPointerException with null byteBufAllocator")
7779
@Test
7880
void constructorNullByteBufAllocator() {
7981
assertThatNullPointerException()
80-
.isThrownBy(() -> new FragmentationDuplexConnection(delegate, null, 64, false))
82+
.isThrownBy(() -> new FragmentationDuplexConnection(delegate, null, 64, false, ""))
8183
.withMessage("byteBufAllocator must not be null");
8284
}
8385

8486
@DisplayName("constructor throws NullPointerException with null delegate")
8587
@Test
8688
void constructorNullDelegate() {
8789
assertThatNullPointerException()
88-
.isThrownBy(() -> new FragmentationDuplexConnection(null, allocator, 64, false))
90+
.isThrownBy(() -> new FragmentationDuplexConnection(null, allocator, 64, false, ""))
8991
.withMessage("delegate must not be null");
9092
}
9193

@@ -118,7 +120,7 @@ void reassembleData() {
118120
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
119121
when(delegate.onClose()).thenReturn(Mono.never());
120122

121-
new FragmentationDuplexConnection(delegate, allocator, 1030, false)
123+
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
122124
.receive()
123125
.as(StepVerifier::create)
124126
.assertNext(
@@ -181,7 +183,7 @@ void reassembleMetadata() {
181183
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
182184
when(delegate.onClose()).thenReturn(Mono.never());
183185

184-
new FragmentationDuplexConnection(delegate, allocator, 1030, false)
186+
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
185187
.receive()
186188
.as(StepVerifier::create)
187189
.assertNext(
@@ -249,7 +251,7 @@ void reassembleMetadataAndData() {
249251
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
250252
when(delegate.onClose()).thenReturn(Mono.never());
251253

252-
new FragmentationDuplexConnection(delegate, allocator, 1030, false)
254+
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
253255
.receive()
254256
.as(StepVerifier::create)
255257
.assertNext(
@@ -270,7 +272,7 @@ void reassembleNonFragment() {
270272
when(delegate.receive()).thenReturn(Flux.just(encode));
271273
when(delegate.onClose()).thenReturn(Mono.never());
272274

273-
new FragmentationDuplexConnection(delegate, allocator, 1030, false)
275+
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
274276
.receive()
275277
.as(StepVerifier::create)
276278
.assertNext(
@@ -289,7 +291,7 @@ void reassembleNonFragmentableFrame() {
289291
when(delegate.receive()).thenReturn(Flux.just(encode));
290292
when(delegate.onClose()).thenReturn(Mono.never());
291293

292-
new FragmentationDuplexConnection(delegate, allocator, 1030, false)
294+
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
293295
.receive()
294296
.as(StepVerifier::create)
295297
.assertNext(
@@ -308,7 +310,7 @@ void sendData() {
308310

309311
when(delegate.onClose()).thenReturn(Mono.never());
310312

311-
new FragmentationDuplexConnection(delegate, allocator, 64, false).sendOne(encode.retain());
313+
new FragmentationDuplexConnection(delegate, allocator, 64, false, "").sendOne(encode.retain());
312314

313315
verify(delegate).send(publishers.capture());
314316

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public Mono<DuplexConnection> connect(int mtu) {
8181
return connect.map(
8282
duplexConnection ->
8383
new FragmentationDuplexConnection(
84-
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false));
84+
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "client"));
8585
} else {
8686
return connect;
8787
}

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void accept(DuplexConnection duplexConnection) {
164164
if (mtu > 0) {
165165
duplexConnection =
166166
new FragmentationDuplexConnection(
167-
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false);
167+
duplexConnection, ByteBufAllocator.DEFAULT, mtu, false, "server");
168168
}
169169

170170
acceptor.apply(duplexConnection).subscribe();

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ public Mono<DuplexConnection> connect(int mtu) {
101101
c -> {
102102
if (mtu > 0) {
103103
return new FragmentationDuplexConnection(
104-
new TcpDuplexConnection(c, false), ByteBufAllocator.DEFAULT, mtu, true);
104+
new TcpDuplexConnection(c, false),
105+
ByteBufAllocator.DEFAULT,
106+
mtu,
107+
true,
108+
"client");
105109
} else {
106110
return new TcpDuplexConnection(c);
107111
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ public Mono<DuplexConnection> connect(int mtu) {
162162
if (mtu > 0) {
163163
connection =
164164
new FragmentationDuplexConnection(
165-
connection, ByteBufAllocator.DEFAULT, mtu, false);
165+
connection, ByteBufAllocator.DEFAULT, mtu, false, "client");
166166
}
167167
return connection;
168168
});

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/TcpServerTransport.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,11 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
103103
if (mtu > 0) {
104104
connection =
105105
new FragmentationDuplexConnection(
106-
new TcpDuplexConnection(c, false), ByteBufAllocator.DEFAULT, mtu, true);
106+
new TcpDuplexConnection(c, false),
107+
ByteBufAllocator.DEFAULT,
108+
mtu,
109+
true,
110+
"server");
107111
} else {
108112
connection = new TcpDuplexConnection(c);
109113
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ public static BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> n
113113
DuplexConnection connection = new WebsocketDuplexConnection((Connection) in);
114114
if (mtu > 0) {
115115
connection =
116-
new FragmentationDuplexConnection(connection, ByteBufAllocator.DEFAULT, mtu, false);
116+
new FragmentationDuplexConnection(
117+
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
117118
}
118119
return acceptor.apply(connection).then(out.neverComplete());
119120
};

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor, int mtu) {
123123
if (mtu > 0) {
124124
connection =
125125
new FragmentationDuplexConnection(
126-
connection, ByteBufAllocator.DEFAULT, mtu, false);
126+
connection, ByteBufAllocator.DEFAULT, mtu, false, "server");
127127
}
128128
return acceptor.apply(connection).then(out.neverComplete());
129129
});

rsocket-transport-netty/src/test/java/io/rsocket/integration/FragmentTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,28 @@
3535
import reactor.core.publisher.Mono;
3636

3737
public class FragmentTest {
38-
private static final int frameSize = 128;
38+
private static final int frameSize = 64;
3939
private AbstractRSocket handler;
4040
private CloseableChannel server;
4141
private String message = null;
4242
private String metaData = null;
43+
private String responseMessage = null;
4344

4445
@BeforeEach
4546
public void startup() {
4647
int randomPort = ThreadLocalRandom.current().nextInt(10_000, 20_000);
4748
StringBuilder message = new StringBuilder();
49+
StringBuilder responseMessage = new StringBuilder();
4850
StringBuilder metaData = new StringBuilder();
4951
for (int i = 0; i < 100; i++) {
50-
message.append("RESPONSE ");
52+
message.append("REQUEST ");
53+
responseMessage.append("RESPONSE ");
5154
metaData.append("METADATA ");
5255
}
5356
this.message = message.toString();
57+
this.responseMessage = responseMessage.toString();
5458
this.metaData = metaData.toString();
59+
5560
TcpServerTransport serverTransport = TcpServerTransport.create(randomPort);
5661
server =
5762
RSocketFactory.receive()
@@ -88,7 +93,7 @@ public Flux<Payload> requestStream(Payload payload) {
8893
System.out.println("request message: " + request);
8994
System.out.println("request metadata: " + metaData);
9095

91-
return Flux.just(DefaultPayload.create(request));
96+
return Flux.just(DefaultPayload.create(responseMessage));
9297
}
9398
};
9499

@@ -100,7 +105,7 @@ public Flux<Payload> requestStream(Payload payload) {
100105
System.out.println("response message: " + payload.getDataUtf8());
101106
System.out.println("response metadata: " + payload.getMetadataUtf8());
102107

103-
assertThat(message).isEqualTo(payload.getDataUtf8());
108+
assertThat(responseMessage).isEqualTo(payload.getDataUtf8());
104109
}
105110

106111
@Test
@@ -116,7 +121,7 @@ public Flux<Payload> requestStream(Payload payload) {
116121
System.out.println("request message: " + request);
117122
System.out.println("request metadata: " + metaData);
118123

119-
return Flux.just(DefaultPayload.create(request));
124+
return Flux.just(DefaultPayload.create(responseMessage));
120125
}
121126
};
122127

@@ -128,11 +133,12 @@ public Flux<Payload> requestStream(Payload payload) {
128133
System.out.println("response message: " + payload.getDataUtf8());
129134
System.out.println("response metadata: " + payload.getMetadataUtf8());
130135

131-
assertThat(message).isEqualTo(payload.getDataUtf8());
136+
assertThat(responseMessage).isEqualTo(payload.getDataUtf8());
132137
}
133138

134139
@Test
135140
void testFragmentBothMetaData() {
141+
Payload responsePayload = DefaultPayload.create(responseMessage);
136142
System.out.println(
137143
"-------------------------------------------------testFragmentBothMetaData-------------------------------------------------");
138144
handler =
@@ -144,7 +150,7 @@ public Flux<Payload> requestStream(Payload payload) {
144150
System.out.println("request message: " + request);
145151
System.out.println("request metadata: " + metaData);
146152

147-
return Flux.just(DefaultPayload.create(request, metaData));
153+
return Flux.just(DefaultPayload.create(responseMessage, metaData));
148154
}
149155

150156
@Override
@@ -154,7 +160,7 @@ public Mono<Payload> requestResponse(Payload payload) {
154160
System.out.println("request message: " + request);
155161
System.out.println("request metadata: " + metaData);
156162

157-
return Mono.just(DefaultPayload.create(request, metaData));
163+
return Mono.just(DefaultPayload.create(responseMessage, metaData));
158164
}
159165
};
160166

@@ -166,6 +172,6 @@ public Mono<Payload> requestResponse(Payload payload) {
166172
System.out.println("response message: " + payload.getDataUtf8());
167173
System.out.println("response metadata: " + payload.getMetadataUtf8());
168174

169-
assertThat(message).isEqualTo(payload.getDataUtf8());
175+
assertThat(responseMessage).isEqualTo(payload.getDataUtf8());
170176
}
171177
}

0 commit comments

Comments
 (0)