Skip to content

Commit 6477513

Browse files
committed
Merge pull request #53 from ReactiveSocket/fragmenter-tests
reassembler tests
2 parents 250af8a + 087765b commit 6477513

File tree

3 files changed

+162
-3
lines changed

3 files changed

+162
-3
lines changed

src/main/java/io/reactivesocket/internal/PayloadReassembler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private PayloadReassembler(final Subscriber<? super Payload> child)
3131
this.child = child;
3232
}
3333

34-
public PayloadReassembler with(final Subscriber<? super Payload> child)
34+
public static PayloadReassembler with(final Subscriber<? super Payload> child)
3535
{
3636
return new PayloadReassembler(child);
3737
}
@@ -71,9 +71,10 @@ public void onNext(Frame frame)
7171
if (null == payloadBuilder)
7272
{
7373
payloadBuilder = new PayloadBuilder();
74-
payloadBuilder.append(frame);
7574
payloadByStreamId.put(streamId, payloadBuilder);
7675
}
76+
77+
payloadBuilder.append(frame);
7778
}
7879
}
7980

src/test/java/io/reactivesocket/TestUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static Payload utf8EncodedPayload(final String data, final String metadat
5555

5656
public static String byteToString(final ByteBuffer byteBuffer)
5757
{
58-
final byte[] bytes = new byte[byteBuffer.capacity()];
58+
final byte[] bytes = new byte[byteBuffer.remaining()];
5959
byteBuffer.get(bytes);
6060
return new String(bytes, Charset.forName("UTF-8"));
6161
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import io.reactivesocket.Frame;
19+
import io.reactivesocket.FrameType;
20+
import io.reactivesocket.Payload;
21+
import io.reactivesocket.TestUtil;
22+
import io.reactivex.subjects.ReplaySubject;
23+
import org.junit.Test;
24+
25+
import java.nio.ByteBuffer;
26+
27+
import static org.junit.Assert.assertEquals;
28+
29+
public class ReassemblerTest
30+
{
31+
private static final int STREAM_ID = 101;
32+
33+
@Test
34+
public void shouldPassThroughUnfragmentedFrame()
35+
{
36+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
37+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
38+
final String metadata = "metadata";
39+
final String data = "data";
40+
final ByteBuffer metadataBuffer = TestUtil.byteBufferFromUtf8String(metadata);
41+
final ByteBuffer dataBuffer = TestUtil.byteBufferFromUtf8String(data);
42+
43+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, dataBuffer, 0));
44+
45+
assertEquals(1, replaySubject.getValues().length);
46+
assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData()));
47+
assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata()));
48+
}
49+
50+
@Test
51+
public void shouldNotPassThroughFragmentedFrameIfStillMoreFollowing()
52+
{
53+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
54+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
55+
final String metadata = "metadata";
56+
final String data = "data";
57+
final ByteBuffer metadataBuffer = TestUtil.byteBufferFromUtf8String(metadata);
58+
final ByteBuffer dataBuffer = TestUtil.byteBufferFromUtf8String(data);
59+
60+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, dataBuffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
61+
62+
assertEquals(0, replaySubject.getValues().length);
63+
}
64+
65+
@Test
66+
public void shouldReassembleTwoFramesWithFragmentedDataAndMetadata()
67+
{
68+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
69+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
70+
final String metadata0 = "metadata0";
71+
final String metadata1 = "md1";
72+
final String metadata = metadata0 + metadata1;
73+
final String data0 = "data0";
74+
final String data1 = "d1";
75+
final String data = data0 + data1;
76+
final ByteBuffer metadata0Buffer = TestUtil.byteBufferFromUtf8String(metadata0);
77+
final ByteBuffer data0Buffer = TestUtil.byteBufferFromUtf8String(data0);
78+
final ByteBuffer metadata1Buffer = TestUtil.byteBufferFromUtf8String(metadata1);
79+
final ByteBuffer data1Buffer = TestUtil.byteBufferFromUtf8String(data1);
80+
81+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata0Buffer, data0Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
82+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, data1Buffer, 0));
83+
84+
assertEquals(1, replaySubject.getValues().length);
85+
assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData()));
86+
assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata()));
87+
}
88+
89+
@Test
90+
public void shouldReassembleTwoFramesWithFragmentedData()
91+
{
92+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
93+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
94+
final String metadata = "metadata";
95+
final String data0 = "data0";
96+
final String data1 = "d1";
97+
final String data = data0 + data1;
98+
final ByteBuffer metadataBuffer = TestUtil.byteBufferFromUtf8String(metadata);
99+
final ByteBuffer data0Buffer = TestUtil.byteBufferFromUtf8String(data0);
100+
final ByteBuffer data1Buffer = TestUtil.byteBufferFromUtf8String(data1);
101+
102+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadataBuffer, data0Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
103+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, Frame.NULL_BYTEBUFFER, data1Buffer, 0));
104+
105+
assertEquals(1, replaySubject.getValues().length);
106+
assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData()));
107+
assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata()));
108+
}
109+
110+
@Test
111+
public void shouldReassembleTwoFramesWithFragmentedMetadata()
112+
{
113+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
114+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
115+
final String metadata0 = "metadata0";
116+
final String metadata1 = "md1";
117+
final String metadata = metadata0 + metadata1;
118+
final String data = "data";
119+
final ByteBuffer metadata0Buffer = TestUtil.byteBufferFromUtf8String(metadata0);
120+
final ByteBuffer dataBuffer = TestUtil.byteBufferFromUtf8String(data);
121+
final ByteBuffer metadata1Buffer = TestUtil.byteBufferFromUtf8String(metadata1);
122+
123+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata0Buffer, dataBuffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
124+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, Frame.NULL_BYTEBUFFER, 0));
125+
126+
assertEquals(1, replaySubject.getValues().length);
127+
assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData()));
128+
assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata()));
129+
}
130+
131+
@Test
132+
public void shouldReassembleTwoFramesWithFragmentedDataAndMetadataWithMoreThanTwoFragments()
133+
{
134+
final ReplaySubject<Payload> replaySubject = ReplaySubject.create();
135+
final PayloadReassembler reassembler = PayloadReassembler.with(replaySubject);
136+
final String metadata0 = "metadata0";
137+
final String metadata1 = "md1";
138+
final String metadata = metadata0 + metadata1;
139+
final String data0 = "data0";
140+
final String data1 = "d1";
141+
final String data2 = "d2";
142+
final String data = data0 + data1 + data2;
143+
final ByteBuffer metadata0Buffer = TestUtil.byteBufferFromUtf8String(metadata0);
144+
final ByteBuffer data0Buffer = TestUtil.byteBufferFromUtf8String(data0);
145+
final ByteBuffer metadata1Buffer = TestUtil.byteBufferFromUtf8String(metadata1);
146+
final ByteBuffer data1Buffer = TestUtil.byteBufferFromUtf8String(data1);
147+
final ByteBuffer data2Buffer = TestUtil.byteBufferFromUtf8String(data2);
148+
149+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata0Buffer, data0Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
150+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, metadata1Buffer, data1Buffer, FrameHeaderFlyweight.FLAGS_RESPONSE_F));
151+
reassembler.onNext(Frame.Response.from(STREAM_ID, FrameType.NEXT, Frame.NULL_BYTEBUFFER, data2Buffer, 0));
152+
153+
assertEquals(1, replaySubject.getValues().length);
154+
assertEquals(data, TestUtil.byteToString(replaySubject.getValue().getData()));
155+
assertEquals(metadata, TestUtil.byteToString(replaySubject.getValue().getMetadata()));
156+
}
157+
158+
}

0 commit comments

Comments
 (0)