Skip to content

Commit 40babbf

Browse files
committed
tests for fragmenter
1 parent b6f049b commit 40babbf

File tree

4 files changed

+204
-4
lines changed

4 files changed

+204
-4
lines changed

src/main/java/io/reactivesocket/Frame.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ public static Frame from(int streamId, FrameType type, ByteBuffer metadata, Byte
423423
{
424424
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, metadata.capacity(), data.capacity()));
425425

426-
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, initialRequestN, NULL_BYTEBUFFER, NULL_BYTEBUFFER);
426+
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, initialRequestN, metadata, data);
427427
return frame;
428428

429429
}

src/main/java/io/reactivesocket/internal/PayloadFragmenter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ public Frame next()
7676
Frame result = null;
7777

7878
final ByteBuffer metadataBuffer = metadataLength > 0 ?
79-
ByteBufferUtil.preservingSlice(metadata, metadataOffset, metadataOffset + metadataLength) : null;
79+
ByteBufferUtil.preservingSlice(metadata, metadataOffset, metadataOffset + metadataLength) : Frame.NULL_BYTEBUFFER;
8080

8181
final ByteBuffer dataBuffer = dataLength > 0 ?
82-
ByteBufferUtil.preservingSlice(data, dataOffset, dataOffset + dataLength) : null;
82+
ByteBufferUtil.preservingSlice(data, dataOffset, dataOffset + dataLength) : Frame.NULL_BYTEBUFFER;
8383

8484
metadataOffset += metadataLength;
8585
dataOffset += dataLength;

src/main/java/io/reactivesocket/internal/RequestFrameFlyweight.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static int encode(
5555
{
5656
final int frameLength = computeFrameLength(type, metadata.capacity(), data.capacity());
5757

58-
flags = FLAGS_REQUEST_CHANNEL_N;
58+
flags |= FLAGS_REQUEST_CHANNEL_N;
5959
int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, type, streamId);
6060

6161
mutableDirectBuffer.putInt(offset + INITIAL_REQUEST_N_FIELD_OFFSET, initialRequestN, ByteOrder.BIG_ENDIAN);
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import io.reactivesocket.Frame;
19+
import io.reactivesocket.Payload;
20+
import io.reactivesocket.TestUtil;
21+
import org.junit.Test;
22+
23+
import static org.junit.Assert.*;
24+
25+
public class FragmenterTest
26+
{
27+
private static final int METADATA_MTU = 256;
28+
private static final int DATA_MTU = 256;
29+
private static final int STREAM_ID = 101;
30+
private static final int REQUEST_N = 102;
31+
32+
@Test
33+
public void shouldPassThroughUnfragmentedResponse()
34+
{
35+
final PayloadFragmenter fragmenter = new PayloadFragmenter(METADATA_MTU, DATA_MTU);
36+
final Payload payload = TestUtil.utf8EncodedPayload("response data", "response metadata");
37+
38+
fragmenter.resetForResponse(STREAM_ID, payload);
39+
40+
assertTrue(fragmenter.hasNext());
41+
final Frame frame1 = fragmenter.next();
42+
43+
assertEquals("response data", TestUtil.byteToString(frame1.getData()));
44+
assertEquals("response metadata", TestUtil.byteToString(frame1.getMetadata()));
45+
assertEquals(0, (frame1.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
46+
assertFalse(fragmenter.hasNext());
47+
}
48+
49+
@Test
50+
public void shouldHandleFragmentedResponseData()
51+
{
52+
final String responseData0 = "response ";
53+
final String responseData1 = "data";
54+
final String responseData = responseData0 + responseData1;
55+
final PayloadFragmenter fragmenter = new PayloadFragmenter(METADATA_MTU, responseData0.length());
56+
final Payload payload = TestUtil.utf8EncodedPayload(responseData, "response metadata");
57+
58+
fragmenter.resetForResponse(STREAM_ID, payload);
59+
60+
assertTrue(fragmenter.hasNext());
61+
final Frame frame1 = fragmenter.next();
62+
63+
assertEquals(responseData0, TestUtil.byteToString(frame1.getData()));
64+
assertEquals("response metadata", TestUtil.byteToString(frame1.getMetadata()));
65+
assertEquals(FrameHeaderFlyweight.FLAGS_RESPONSE_F, (frame1.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
66+
67+
assertTrue(fragmenter.hasNext());
68+
final Frame frame2 = fragmenter.next();
69+
70+
assertEquals(responseData1, TestUtil.byteToString(frame2.getData()));
71+
assertEquals("", TestUtil.byteToString(frame2.getMetadata()));
72+
assertEquals(0, (frame2.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
73+
assertFalse(fragmenter.hasNext());
74+
}
75+
76+
@Test
77+
public void shouldHandleFragmentedResponseMetadata()
78+
{
79+
final String responseMetadata0 = "response ";
80+
final String responseMetadata1 = "metadata";
81+
final String responseMetadata = responseMetadata0 + responseMetadata1;
82+
final PayloadFragmenter fragmenter = new PayloadFragmenter(responseMetadata0.length(), DATA_MTU);
83+
final Payload payload = TestUtil.utf8EncodedPayload("response data", responseMetadata);
84+
85+
fragmenter.resetForResponse(STREAM_ID, payload);
86+
87+
assertTrue(fragmenter.hasNext());
88+
final Frame frame1 = fragmenter.next();
89+
90+
assertEquals("response data", TestUtil.byteToString(frame1.getData()));
91+
assertEquals(responseMetadata0, TestUtil.byteToString(frame1.getMetadata()));
92+
assertEquals(FrameHeaderFlyweight.FLAGS_RESPONSE_F, (frame1.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
93+
94+
assertTrue(fragmenter.hasNext());
95+
final Frame frame2 = fragmenter.next();
96+
97+
assertEquals("", TestUtil.byteToString(frame2.getData()));
98+
assertEquals(responseMetadata1, TestUtil.byteToString(frame2.getMetadata()));
99+
assertEquals(0, (frame2.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
100+
assertFalse(fragmenter.hasNext());
101+
}
102+
103+
@Test
104+
public void shouldHandleFragmentedResponseMetadataAndData()
105+
{
106+
final String responseMetadata0 = "response ";
107+
final String responseMetadata1 = "metadata";
108+
final String responseMetadata = responseMetadata0 + responseMetadata1;
109+
final String responseData0 = "response ";
110+
final String responseData1 = "data";
111+
final String responseData = responseData0 + responseData1;
112+
final PayloadFragmenter fragmenter = new PayloadFragmenter(responseMetadata0.length(), responseData0.length());
113+
final Payload payload = TestUtil.utf8EncodedPayload(responseData, responseMetadata);
114+
115+
fragmenter.resetForResponse(STREAM_ID, payload);
116+
117+
assertTrue(fragmenter.hasNext());
118+
final Frame frame1 = fragmenter.next();
119+
120+
assertEquals(responseData0, TestUtil.byteToString(frame1.getData()));
121+
assertEquals(responseMetadata0, TestUtil.byteToString(frame1.getMetadata()));
122+
assertEquals(FrameHeaderFlyweight.FLAGS_RESPONSE_F, (frame1.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
123+
124+
assertTrue(fragmenter.hasNext());
125+
final Frame frame2 = fragmenter.next();
126+
127+
assertEquals(responseData1, TestUtil.byteToString(frame2.getData()));
128+
assertEquals(responseMetadata1, TestUtil.byteToString(frame2.getMetadata()));
129+
assertEquals(0, (frame2.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
130+
assertFalse(fragmenter.hasNext());
131+
}
132+
133+
@Test
134+
public void shouldHandleFragmentedResponseMetadataAndDataWithMoreThanTwoFragments()
135+
{
136+
final String responseMetadata0 = "response ";
137+
final String responseMetadata1 = "metadata";
138+
final String responseMetadata = responseMetadata0 + responseMetadata1;
139+
final String responseData0 = "response ";
140+
final String responseData1 = "data ";
141+
final String responseData2 = "and more";
142+
final String responseData = responseData0 + responseData1 + responseData2;
143+
final PayloadFragmenter fragmenter = new PayloadFragmenter(responseMetadata0.length(), responseData0.length());
144+
final Payload payload = TestUtil.utf8EncodedPayload(responseData, responseMetadata);
145+
146+
fragmenter.resetForResponse(STREAM_ID, payload);
147+
148+
assertTrue(fragmenter.hasNext());
149+
final Frame frame1 = fragmenter.next();
150+
151+
assertEquals(responseData0, TestUtil.byteToString(frame1.getData()));
152+
assertEquals(responseMetadata0, TestUtil.byteToString(frame1.getMetadata()));
153+
assertEquals(FrameHeaderFlyweight.FLAGS_RESPONSE_F, (frame1.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
154+
155+
assertTrue(fragmenter.hasNext());
156+
final Frame frame2 = fragmenter.next();
157+
158+
assertEquals(responseData1, TestUtil.byteToString(frame2.getData()));
159+
assertEquals(responseMetadata1, TestUtil.byteToString(frame2.getMetadata()));
160+
assertEquals(FrameHeaderFlyweight.FLAGS_RESPONSE_F, (frame2.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
161+
162+
assertTrue(fragmenter.hasNext());
163+
final Frame frame3 = fragmenter.next();
164+
165+
assertEquals(responseData2, TestUtil.byteToString(frame3.getData()));
166+
assertEquals("", TestUtil.byteToString(frame3.getMetadata()));
167+
assertEquals(0, (frame3.flags() & FrameHeaderFlyweight.FLAGS_RESPONSE_F));
168+
assertFalse(fragmenter.hasNext());
169+
}
170+
171+
@Test
172+
public void shouldHandleFragmentedRequestChannelMetadataAndData()
173+
{
174+
final String requestMetadata0 = "request ";
175+
final String requestMetadata1 = "metadata";
176+
final String requestMetadata = requestMetadata0 + requestMetadata1;
177+
final String requestData0 = "request ";
178+
final String requestData1 = "data";
179+
final String requestData = requestData0 + requestData1;
180+
final PayloadFragmenter fragmenter = new PayloadFragmenter(requestMetadata0.length(), requestData0.length());
181+
final Payload payload = TestUtil.utf8EncodedPayload(requestData, requestMetadata);
182+
183+
fragmenter.resetForRequestChannel(STREAM_ID, payload, REQUEST_N);
184+
185+
assertTrue(fragmenter.hasNext());
186+
final Frame frame1 = fragmenter.next();
187+
188+
assertEquals(requestData0, TestUtil.byteToString(frame1.getData()));
189+
assertEquals(requestMetadata0, TestUtil.byteToString(frame1.getMetadata()));
190+
assertEquals(FrameHeaderFlyweight.FLAGS_REQUEST_CHANNEL_F, (frame1.flags() & FrameHeaderFlyweight.FLAGS_REQUEST_CHANNEL_F));
191+
192+
assertTrue(fragmenter.hasNext());
193+
final Frame frame2 = fragmenter.next();
194+
195+
assertEquals(requestData1, TestUtil.byteToString(frame2.getData()));
196+
assertEquals(requestMetadata1, TestUtil.byteToString(frame2.getMetadata()));
197+
assertEquals(0, (frame2.flags() & FrameHeaderFlyweight.FLAGS_REQUEST_CHANNEL_F));
198+
assertFalse(fragmenter.hasNext());
199+
}
200+
}

0 commit comments

Comments
 (0)