Skip to content

Commit 28ba607

Browse files
committed
enforces reassembly on the receiver side
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0ac54d4 commit 28ba607

File tree

12 files changed

+424
-255
lines changed

12 files changed

+424
-255
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import reactor.core.publisher.Mono;
3535

3636
/**
37-
* A {@link DuplexConnection} implementation that fragments and reassembles {@link ByteBuf}s.
37+
* A {@link DuplexConnection} implementation that fragments {@link ByteBuf}s.
3838
*
3939
* @see <a
4040
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
@@ -138,23 +138,9 @@ private ByteBuf encode(ByteBuf frame) {
138138
}
139139
}
140140

141-
private ByteBuf decode(ByteBuf frame) {
142-
if (encodeLength) {
143-
return FrameLengthFlyweight.frame(frame).retain();
144-
} else {
145-
return frame;
146-
}
147-
}
148-
149141
@Override
150142
public Flux<ByteBuf> receive() {
151-
return delegate
152-
.receive()
153-
.handle(
154-
(byteBuf, sink) -> {
155-
ByteBuf decode = decode(byteBuf);
156-
frameReassembler.reassembleFrame(decode, sink);
157-
});
143+
return delegate.receive();
158144
}
159145

160146
@Override
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.fragmentation;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.rsocket.DuplexConnection;
22+
import io.rsocket.frame.FrameLengthFlyweight;
23+
import java.util.Objects;
24+
import org.reactivestreams.Publisher;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
28+
/**
29+
* A {@link DuplexConnection} implementation that reassembles {@link ByteBuf}s.
30+
*
31+
* @see <a
32+
* href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#fragmentation-and-reassembly">Fragmentation
33+
* and Reassembly</a>
34+
*/
35+
public final class ReassemblyDuplexConnection implements DuplexConnection {
36+
private final DuplexConnection delegate;
37+
private final FrameReassembler frameReassembler;
38+
private final boolean decodeLength;
39+
40+
public ReassemblyDuplexConnection(
41+
DuplexConnection delegate, ByteBufAllocator allocator, boolean decodeLength) {
42+
Objects.requireNonNull(delegate, "delegate must not be null");
43+
Objects.requireNonNull(allocator, "byteBufAllocator must not be null");
44+
this.decodeLength = decodeLength;
45+
this.delegate = delegate;
46+
this.frameReassembler = new FrameReassembler(allocator);
47+
48+
delegate.onClose().doFinally(s -> frameReassembler.dispose()).subscribe();
49+
}
50+
51+
@Override
52+
public Mono<Void> send(Publisher<ByteBuf> frames) {
53+
return delegate.send(frames);
54+
}
55+
56+
@Override
57+
public Mono<Void> sendOne(ByteBuf frame) {
58+
return delegate.sendOne(frame);
59+
}
60+
61+
private ByteBuf decode(ByteBuf frame) {
62+
if (decodeLength) {
63+
return FrameLengthFlyweight.frame(frame).retain();
64+
} else {
65+
return frame;
66+
}
67+
}
68+
69+
@Override
70+
public Flux<ByteBuf> receive() {
71+
return delegate
72+
.receive()
73+
.handle(
74+
(byteBuf, sink) -> {
75+
ByteBuf decode = decode(byteBuf);
76+
frameReassembler.reassembleFrame(decode, sink);
77+
});
78+
}
79+
80+
@Override
81+
public Mono<Void> onClose() {
82+
return delegate.onClose();
83+
}
84+
85+
@Override
86+
public void dispose() {
87+
delegate.dispose();
88+
}
89+
}

rsocket-core/src/test/java/io/rsocket/fragmentation/FragmentationDuplexConnectionTest.java

Lines changed: 0 additions & 214 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@
2222

2323
import io.netty.buffer.ByteBuf;
2424
import io.netty.buffer.ByteBufAllocator;
25-
import io.netty.buffer.CompositeByteBuf;
2625
import io.netty.buffer.Unpooled;
2726
import io.rsocket.DuplexConnection;
2827
import io.rsocket.frame.*;
29-
import io.rsocket.util.DefaultPayload;
30-
import java.util.Arrays;
31-
import java.util.List;
3228
import java.util.concurrent.ThreadLocalRandom;
3329
import org.junit.Assert;
3430
import org.junit.jupiter.api.DisplayName;
@@ -91,216 +87,6 @@ void constructorNullDelegate() {
9187
.withMessage("delegate must not be null");
9288
}
9389

94-
@DisplayName("reassembles data")
95-
@Test
96-
void reassembleData() {
97-
List<ByteBuf> byteBufs =
98-
Arrays.asList(
99-
RequestResponseFrameFlyweight.encode(allocator, 1, true, DefaultPayload.create(data)),
100-
PayloadFrameFlyweight.encode(
101-
allocator, 1, true, false, true, DefaultPayload.create(data)),
102-
PayloadFrameFlyweight.encode(
103-
allocator, 1, true, false, true, DefaultPayload.create(data)),
104-
PayloadFrameFlyweight.encode(
105-
allocator, 1, true, false, true, DefaultPayload.create(data)),
106-
PayloadFrameFlyweight.encode(
107-
allocator, 1, false, false, true, DefaultPayload.create(data)));
108-
109-
CompositeByteBuf data =
110-
allocator
111-
.compositeDirectBuffer()
112-
.addComponents(
113-
true,
114-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
115-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
116-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
117-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
118-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data));
119-
120-
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
121-
when(delegate.onClose()).thenReturn(Mono.never());
122-
123-
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
124-
.receive()
125-
.as(StepVerifier::create)
126-
.assertNext(
127-
byteBuf -> {
128-
Assert.assertEquals(data, RequestResponseFrameFlyweight.data(byteBuf));
129-
})
130-
.verifyComplete();
131-
}
132-
133-
@DisplayName("reassembles metadata")
134-
@Test
135-
void reassembleMetadata() {
136-
List<ByteBuf> byteBufs =
137-
Arrays.asList(
138-
RequestResponseFrameFlyweight.encode(
139-
allocator,
140-
1,
141-
true,
142-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
143-
PayloadFrameFlyweight.encode(
144-
allocator,
145-
1,
146-
true,
147-
false,
148-
true,
149-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
150-
PayloadFrameFlyweight.encode(
151-
allocator,
152-
1,
153-
true,
154-
false,
155-
true,
156-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
157-
PayloadFrameFlyweight.encode(
158-
allocator,
159-
1,
160-
true,
161-
false,
162-
true,
163-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
164-
PayloadFrameFlyweight.encode(
165-
allocator,
166-
1,
167-
false,
168-
false,
169-
true,
170-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))));
171-
172-
CompositeByteBuf metadata =
173-
allocator
174-
.compositeDirectBuffer()
175-
.addComponents(
176-
true,
177-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
178-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
179-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
180-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
181-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata));
182-
183-
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
184-
when(delegate.onClose()).thenReturn(Mono.never());
185-
186-
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
187-
.receive()
188-
.as(StepVerifier::create)
189-
.assertNext(
190-
byteBuf -> {
191-
System.out.println(byteBuf.readableBytes());
192-
ByteBuf m = RequestResponseFrameFlyweight.metadata(byteBuf);
193-
Assert.assertEquals(metadata, m);
194-
})
195-
.verifyComplete();
196-
}
197-
198-
@DisplayName("reassembles metadata and data")
199-
@Test
200-
void reassembleMetadataAndData() {
201-
List<ByteBuf> byteBufs =
202-
Arrays.asList(
203-
RequestResponseFrameFlyweight.encode(
204-
allocator,
205-
1,
206-
true,
207-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
208-
PayloadFrameFlyweight.encode(
209-
allocator,
210-
1,
211-
true,
212-
false,
213-
true,
214-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
215-
PayloadFrameFlyweight.encode(
216-
allocator,
217-
1,
218-
true,
219-
false,
220-
true,
221-
DefaultPayload.create(Unpooled.EMPTY_BUFFER, Unpooled.wrappedBuffer(metadata))),
222-
PayloadFrameFlyweight.encode(
223-
allocator,
224-
1,
225-
true,
226-
false,
227-
true,
228-
DefaultPayload.create(
229-
Unpooled.wrappedBuffer(data), Unpooled.wrappedBuffer(metadata))),
230-
PayloadFrameFlyweight.encode(
231-
allocator, 1, false, false, true, DefaultPayload.create(data)));
232-
233-
CompositeByteBuf data =
234-
allocator
235-
.compositeDirectBuffer()
236-
.addComponents(
237-
true,
238-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data),
239-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.data));
240-
241-
CompositeByteBuf metadata =
242-
allocator
243-
.compositeDirectBuffer()
244-
.addComponents(
245-
true,
246-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
247-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
248-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata),
249-
Unpooled.wrappedBuffer(FragmentationDuplexConnectionTest.metadata));
250-
251-
when(delegate.receive()).thenReturn(Flux.fromIterable(byteBufs));
252-
when(delegate.onClose()).thenReturn(Mono.never());
253-
254-
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
255-
.receive()
256-
.as(StepVerifier::create)
257-
.assertNext(
258-
byteBuf -> {
259-
Assert.assertEquals(data, RequestResponseFrameFlyweight.data(byteBuf));
260-
Assert.assertEquals(metadata, RequestResponseFrameFlyweight.metadata(byteBuf));
261-
})
262-
.verifyComplete();
263-
}
264-
265-
@DisplayName("does not reassemble a non-fragment frame")
266-
@Test
267-
void reassembleNonFragment() {
268-
ByteBuf encode =
269-
RequestResponseFrameFlyweight.encode(
270-
allocator, 1, false, DefaultPayload.create(Unpooled.wrappedBuffer(data)));
271-
272-
when(delegate.receive()).thenReturn(Flux.just(encode));
273-
when(delegate.onClose()).thenReturn(Mono.never());
274-
275-
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
276-
.receive()
277-
.as(StepVerifier::create)
278-
.assertNext(
279-
byteBuf -> {
280-
Assert.assertEquals(
281-
Unpooled.wrappedBuffer(data), RequestResponseFrameFlyweight.data(byteBuf));
282-
})
283-
.verifyComplete();
284-
}
285-
286-
@DisplayName("does not reassemble non fragmentable frame")
287-
@Test
288-
void reassembleNonFragmentableFrame() {
289-
ByteBuf encode = CancelFrameFlyweight.encode(allocator, 2);
290-
291-
when(delegate.receive()).thenReturn(Flux.just(encode));
292-
when(delegate.onClose()).thenReturn(Mono.never());
293-
294-
new FragmentationDuplexConnection(delegate, allocator, 1030, false, "")
295-
.receive()
296-
.as(StepVerifier::create)
297-
.assertNext(
298-
byteBuf -> {
299-
Assert.assertEquals(FrameType.CANCEL, FrameHeaderFlyweight.frameType(byteBuf));
300-
})
301-
.verifyComplete();
302-
}
303-
30490
@DisplayName("fragments data")
30591
@Test
30692
void sendData() {

0 commit comments

Comments
 (0)