Skip to content

Commit 5b15625

Browse files
committed
Merge pull request #48 from ReactiveSocket/fragementation-reassembly
initial fragmenter and reassembler components
2 parents 2e6b899 + 98b3578 commit 5b15625

File tree

6 files changed

+393
-25
lines changed

6 files changed

+393
-25
lines changed

src/main/java/io/reactivesocket/Frame.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
public class Frame implements Payload
3333
{
3434
public static final ByteBuffer NULL_BYTEBUFFER = FrameHeaderFlyweight.NULL_BYTEBUFFER;
35+
public static final int DATA_MTU = 32 * 1024;
36+
public static final int METADATA_MTU = 32 * 1024;
3537

3638
/*
3739
* ThreadLocal handling in the pool itself. We don't have a per thread pool at this level.
@@ -138,6 +140,16 @@ public int length()
138140
return length;
139141
}
140142

143+
/**
144+
* Return the flags field for the frame
145+
*
146+
* @return frame flags field value
147+
*/
148+
public int flags()
149+
{
150+
return FrameHeaderFlyweight.flags(directBuffer, offset);
151+
}
152+
141153
/**
142154
* Mutates this Frame to contain the given ByteBuffer
143155
*
@@ -224,7 +236,7 @@ public void wrap(final int streamId, final FrameType type, final ByteBuffer data
224236
this.directBuffer =
225237
POOL.acquireMutableDirectBuffer(FrameHeaderFlyweight.computeFrameHeaderLength(type, 0, data.capacity()));
226238

227-
this.length = FrameHeaderFlyweight.encode(this.directBuffer, offset, streamId, type, NULL_BYTEBUFFER, data);
239+
this.length = FrameHeaderFlyweight.encode(this.directBuffer, offset, streamId, 0, type, NULL_BYTEBUFFER, data);
228240
}
229241

230242
/* TODO:
@@ -389,11 +401,11 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
389401

390402
if (type.hasInitialRequestN())
391403
{
392-
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, type, initialRequestN, md, d);
404+
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, 0, type, initialRequestN, md, d);
393405
}
394406
else
395407
{
396-
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, type, md, d);
408+
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, 0, type, md, d);
397409
}
398410

399411
return frame;
@@ -403,10 +415,19 @@ public static Frame from(int streamId, FrameType type, int flags)
403415
{
404416
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, 0, 0));
405417

406-
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, type, flags);
418+
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, NULL_BYTEBUFFER, NULL_BYTEBUFFER);
407419
return frame;
408420
}
409421

422+
public static Frame from(int streamId, FrameType type, ByteBuffer metadata, ByteBuffer data, int initialRequestN, int flags)
423+
{
424+
final Frame frame = POOL.acquireFrame(RequestFrameFlyweight.computeFrameLength(type, metadata.capacity(), data.capacity()));
425+
426+
frame.length = RequestFrameFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, initialRequestN, NULL_BYTEBUFFER, NULL_BYTEBUFFER);
427+
return frame;
428+
429+
}
430+
410431
public static long initialRequestN(final Frame frame)
411432
{
412433
final FrameType type = frame.getType();
@@ -452,7 +473,16 @@ public static Frame from(int streamId, FrameType type, Payload payload)
452473
final Frame frame =
453474
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.capacity(), data.capacity()));
454475

455-
frame.length = FrameHeaderFlyweight.encode(frame.directBuffer, frame.offset, streamId, type, metadata, data);
476+
frame.length = FrameHeaderFlyweight.encode(frame.directBuffer, frame.offset, streamId, 0, type, metadata, data);
477+
return frame;
478+
}
479+
480+
public static Frame from(int streamId, FrameType type, ByteBuffer metadata, ByteBuffer data, int flags)
481+
{
482+
final Frame frame =
483+
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, metadata.capacity(), data.capacity()));
484+
485+
frame.length = FrameHeaderFlyweight.encode(frame.directBuffer, frame.offset, streamId, flags, type, metadata, data);
456486
return frame;
457487
}
458488

@@ -462,7 +492,7 @@ public static Frame from(int streamId, FrameType type)
462492
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(type, 0, 0));
463493

464494
frame.length = FrameHeaderFlyweight.encode(
465-
frame.directBuffer, frame.offset, streamId, type, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER);
495+
frame.directBuffer, frame.offset, streamId, 0, type, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER);
466496
return frame;
467497
}
468498
}
@@ -475,7 +505,7 @@ public static Frame from(int streamId)
475505
POOL.acquireFrame(FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, 0, 0));
476506

477507
frame.length = FrameHeaderFlyweight.encode(
478-
frame.directBuffer, frame.offset, streamId, FrameType.CANCEL, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER);
508+
frame.directBuffer, frame.offset, streamId, 0, FrameType.CANCEL, Frame.NULL_BYTEBUFFER, Frame.NULL_BYTEBUFFER);
479509
return frame;
480510
}
481511
}
@@ -490,7 +520,7 @@ public static Frame from(ByteBuffer data, boolean respond)
490520
final int flags = (respond ? FrameHeaderFlyweight.FLAGS_KEEPALIVE_R : 0);
491521

492522
frame.length = FrameHeaderFlyweight.encode(
493-
frame.directBuffer, frame.offset, flags, FrameType.KEEPALIVE, Frame.NULL_BYTEBUFFER, data);
523+
frame.directBuffer, frame.offset, 0, flags, FrameType.KEEPALIVE, Frame.NULL_BYTEBUFFER, data);
494524

495525
return frame;
496526
}

src/main/java/io/reactivesocket/internal/FrameHeaderFlyweight.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ public class FrameHeaderFlyweight
5454

5555
public static final int FLAGS_KEEPALIVE_R = 0b0010_0000_0000_0000;
5656

57-
private static final int FLAGS_RESPONSE_F = 0b0010_0000_0000_0000;
58-
private static final int FLAGS_RESPONSE_C = 0b0001_0000_0000_0000;
57+
public static final int FLAGS_RESPONSE_F = 0b0010_0000_0000_0000;
58+
public static final int FLAGS_RESPONSE_C = 0b0001_0000_0000_0000;
59+
60+
public static final int FLAGS_REQUEST_CHANNEL_F = 0b0010_0000_0000_0000;
5961

6062
static
6163
{
@@ -144,14 +146,14 @@ public static int encode(
144146
final MutableDirectBuffer mutableDirectBuffer,
145147
final int offset,
146148
final int streamId,
149+
int flags,
147150
final FrameType frameType,
148151
final ByteBuffer metadata,
149152
final ByteBuffer data)
150153
{
151154
final int frameLength = computeFrameHeaderLength(frameType, metadata.capacity(), data.capacity());
152155

153156
final FrameType outFrameType;
154-
int flags = 0;
155157

156158
switch (frameType)
157159
{
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.internal;
17+
18+
import io.reactivesocket.Frame;
19+
import io.reactivesocket.Payload;
20+
import uk.co.real_logic.agrona.BitUtil;
21+
import uk.co.real_logic.agrona.MutableDirectBuffer;
22+
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
23+
24+
import java.nio.ByteBuffer;
25+
import java.util.Arrays;
26+
27+
/**
28+
* Builder for appending buffers that grows dataCapacity as necessary. Similar to Aeron's PayloadBuilder.
29+
*/
30+
public class PayloadBuilder
31+
{
32+
public static final int INITIAL_CAPACITY = Math.max(Frame.DATA_MTU, Frame.METADATA_MTU);
33+
34+
private final MutableDirectBuffer dataMutableDirectBuffer;
35+
private final MutableDirectBuffer metadataMutableDirectBuffer;
36+
37+
private byte[] dataBuffer;
38+
private byte[] metadataBuffer;
39+
private int dataLimit = 0;
40+
private int metadataLimit = 0;
41+
private int dataCapacity;
42+
private int metadataCapacity;
43+
44+
public PayloadBuilder()
45+
{
46+
dataCapacity = BitUtil.findNextPositivePowerOfTwo(INITIAL_CAPACITY);
47+
metadataCapacity = BitUtil.findNextPositivePowerOfTwo(INITIAL_CAPACITY);
48+
dataBuffer = new byte[dataCapacity];
49+
metadataBuffer = new byte[metadataCapacity];
50+
dataMutableDirectBuffer = new UnsafeBuffer(dataBuffer);
51+
metadataMutableDirectBuffer = new UnsafeBuffer(metadataBuffer);
52+
}
53+
54+
public Payload payload()
55+
{
56+
return new Payload()
57+
{
58+
public ByteBuffer getData()
59+
{
60+
return ByteBuffer.wrap(dataBuffer, 0, dataLimit);
61+
}
62+
63+
public ByteBuffer getMetadata()
64+
{
65+
return ByteBuffer.wrap(metadataBuffer, 0, metadataLimit);
66+
}
67+
};
68+
}
69+
70+
public void append(final Payload payload)
71+
{
72+
final ByteBuffer payloadData = payload.getData();
73+
final ByteBuffer payloadMetadata = payload.getMetadata();
74+
75+
ensureDataCapacity(payloadData.capacity());
76+
ensureMetadataCapacity(payloadMetadata.capacity());
77+
78+
dataMutableDirectBuffer.putBytes(dataLimit, payloadData, payloadData.capacity());
79+
dataLimit += payloadData.capacity();
80+
metadataMutableDirectBuffer.putBytes(metadataLimit, payloadMetadata, payloadMetadata.capacity());
81+
metadataLimit += payloadMetadata.capacity();
82+
}
83+
84+
private void ensureDataCapacity(final int additionalCapacity)
85+
{
86+
final int requiredCapacity = dataLimit + additionalCapacity;
87+
88+
if (requiredCapacity < 0)
89+
{
90+
final String s = String.format("Insufficient data capacity: dataLimit=%d additional=%d", dataLimit, additionalCapacity);
91+
throw new IllegalStateException(s);
92+
}
93+
94+
if (requiredCapacity > dataCapacity)
95+
{
96+
final int newCapacity = findSuitableCapacity(dataCapacity, requiredCapacity);
97+
final byte[] newBuffer = Arrays.copyOf(dataBuffer, newCapacity);
98+
99+
dataCapacity = newCapacity;
100+
dataBuffer = newBuffer;
101+
dataMutableDirectBuffer.wrap(newBuffer);
102+
}
103+
}
104+
105+
private void ensureMetadataCapacity(final int additionalCapacity)
106+
{
107+
final int requiredCapacity = metadataLimit + additionalCapacity;
108+
109+
if (requiredCapacity < 0)
110+
{
111+
final String s = String.format("Insufficient metadata capacity: metadataLimit=%d additional=%d", metadataLimit, additionalCapacity);
112+
throw new IllegalStateException(s);
113+
}
114+
115+
if (requiredCapacity > metadataCapacity)
116+
{
117+
final int newCapacity = findSuitableCapacity(metadataCapacity, requiredCapacity);
118+
final byte[] newBuffer = Arrays.copyOf(metadataBuffer, newCapacity);
119+
120+
metadataCapacity = newCapacity;
121+
metadataBuffer = newBuffer;
122+
metadataMutableDirectBuffer.wrap(newBuffer);
123+
}
124+
}
125+
126+
private static int findSuitableCapacity(int capacity, final int requiredCapacity)
127+
{
128+
do
129+
{
130+
capacity <<= 1;
131+
}
132+
while (capacity < requiredCapacity);
133+
134+
return capacity;
135+
}
136+
}

0 commit comments

Comments
 (0)