Skip to content

Commit 7e241e0

Browse files
committed
protocol layout implementation iteration
1 parent e1a1d04 commit 7e241e0

File tree

3 files changed

+136
-30
lines changed

3 files changed

+136
-30
lines changed

src/main/java/io/reactivesocket/Frame.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private Frame() {
4040

4141
private long streamId;
4242
private int version;
43+
private int flags;
4344
private int messageLength = 0;
4445

4546
public ByteBuffer getByteBuffer() {
@@ -76,12 +77,21 @@ public int getVersion()
7677
return version;
7778
}
7879

80+
public int getFlags()
81+
{
82+
if (frameType == null)
83+
{
84+
decode();
85+
}
86+
return flags;
87+
}
88+
7989
/**
8090
* Mutates this Frame to contain the given ByteBuffer
8191
*
8292
* @param b
8393
*/
84-
public void wrap(ByteBuffer b) {
94+
public void wrap(final ByteBuffer b) {
8595
this.streamId = -1;
8696
this.frameType = null;
8797
this.byteBuffer = b;
@@ -93,7 +103,7 @@ public void wrap(ByteBuffer b) {
93103
* @param b
94104
* @return
95105
*/
96-
public static Frame from(ByteBuffer b) {
106+
public static Frame from(final ByteBuffer b) {
97107
Frame f = new Frame();
98108
f.byteBuffer = b;
99109
return f;
@@ -106,7 +116,7 @@ public static Frame from(ByteBuffer b) {
106116
* @param type
107117
* @param message
108118
*/
109-
public void wrap(long streamId, FrameType type, String message) {
119+
public void wrap(final long streamId, final FrameType type, final String message) {
110120
this.streamId = streamId;
111121
this.frameType = type;
112122

@@ -119,11 +129,12 @@ public void wrap(long streamId, FrameType type, String message) {
119129
this.byteBuffer = createByteBufferAndEncode(streamId, type, messageBytes);
120130
}
121131

122-
public void setFromDecode(final int version, final long streamId, final FrameType type)
132+
public void setFromDecode(final int version, final long streamId, final FrameType type, final int flags)
123133
{
124134
this.version = version;
125135
this.streamId = streamId;
126136
this.frameType = type;
137+
this.flags = flags;
127138
}
128139

129140
public void setFromDecode(final DirectBuffer buffer, final int offset, final int messageLength)
@@ -160,7 +171,7 @@ private static ByteBuffer createByteBufferAndEncode(long streamId, FrameType typ
160171
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
161172

162173
// TODO: allocation side effect of how this works currently with the rest of the machinery.
163-
final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.frameLength(message.length));
174+
final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.computeFrameLength(message.length));
164175

165176
frameFlyweight.encode(buffer, streamId, type, message);
166177
return buffer;
@@ -169,7 +180,7 @@ private static ByteBuffer createByteBufferAndEncode(long streamId, FrameType typ
169180
private void decode() {
170181
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
171182

172-
frameFlyweight.decode(this, byteBuffer);
183+
frameFlyweight.decode(this, byteBuffer, 0);
173184
}
174185

175186
private void ensureMessageArrayCapacity(final int length)

src/main/java/io/reactivesocket/FrameFlyweight.java

Lines changed: 109 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,53 +23,145 @@
2323
import java.nio.ByteOrder;
2424

2525
/**
26-
* Per connection frame handling.
27-
* Holds codecs and DirectBuffers for wrapping
26+
* Per connection frame flyweight.
27+
* Holds codecs and DirectBuffer for wrapping
2828
*/
2929
public class FrameFlyweight
3030
{
3131
/**
32-
* Not the real frame layout, just an iteration on the ASCII version
32+
* Not the latest frame layout, but close
33+
* Does not include
34+
* - (initial) request N for REQUEST_STREAM and REQUEST_SUB and REQUEST
35+
* - fragmentation / reassembly
36+
* - encode should remove Type param and have it as part of method name (1 encode per type)
3337
*/
34-
private static final int VERSION_FIELD_OFFSET = 0;
35-
private static final int STREAM_ID_FIELD_OFFSET = VERSION_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
36-
private static final int TYPE_FIELD_OFFSET = STREAM_ID_FIELD_OFFSET + BitUtil.SIZE_OF_LONG;
37-
private static final int DATA_LENGTH_OFFSET = TYPE_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
38-
private static final int DATA_OFFSET = DATA_LENGTH_OFFSET + BitUtil.SIZE_OF_INT;
38+
private static final boolean INCLUDE_FRAME_LENGTH = true;
39+
40+
private static final int FRAME_LENGTH_FIELD_OFFSET;
41+
private static final int VERSION_FIELD_OFFSET;
42+
private static final int FLAGS_FIELD_OFFSET;
43+
private static final int TYPE_FIELD_OFFSET;
44+
private static final int STREAM_ID_FIELD_OFFSET;
45+
private static final int DATA_OFFSET;
3946

4047
private static final byte CURRENT_VERSION = 0;
4148

49+
private static final int FLAGS_I = 0b1000_000;
50+
private static final int FLAGS_B = 0b0100_000;
51+
private static final int FLAGS_E = 0b0010_000;
52+
private static final int FLAGS_C = 0b0001_000;
53+
54+
static
55+
{
56+
if (INCLUDE_FRAME_LENGTH)
57+
{
58+
FRAME_LENGTH_FIELD_OFFSET = 0;
59+
}
60+
else
61+
{
62+
FRAME_LENGTH_FIELD_OFFSET = - BitUtil.SIZE_OF_INT;
63+
}
64+
65+
VERSION_FIELD_OFFSET = FRAME_LENGTH_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
66+
FLAGS_FIELD_OFFSET = VERSION_FIELD_OFFSET + BitUtil.SIZE_OF_BYTE;
67+
TYPE_FIELD_OFFSET = FLAGS_FIELD_OFFSET + BitUtil.SIZE_OF_BYTE;
68+
STREAM_ID_FIELD_OFFSET = TYPE_FIELD_OFFSET + BitUtil.SIZE_OF_SHORT;
69+
DATA_OFFSET = STREAM_ID_FIELD_OFFSET + BitUtil.SIZE_OF_LONG;
70+
}
71+
4272
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
4373

4474
// single threaded assumed
4575
private final MutableDirectBuffer frameBuffer = new UnsafeBuffer(EMPTY_BUFFER);
4676

47-
public static int frameLength(final int dataLength)
77+
public static int computeFrameLength(final int dataLength)
4878
{
4979
return DATA_OFFSET + dataLength;
5080
}
5181

52-
public void encode(final ByteBuffer byteBuffer, final long streamId, final FrameType type, final byte[] data)
82+
public int encode(final ByteBuffer byteBuffer, final long streamId, final FrameType type, final byte[] data)
5383
{
84+
final int frameLength = computeFrameLength(data.length);
85+
5486
frameBuffer.wrap(byteBuffer);
87+
88+
if (INCLUDE_FRAME_LENGTH)
89+
{
90+
frameBuffer.putInt(FRAME_LENGTH_FIELD_OFFSET, frameLength, ByteOrder.BIG_ENDIAN);
91+
}
92+
93+
final FrameType outFrameType;
94+
int flags = 0;
95+
96+
switch (type)
97+
{
98+
case COMPLETE:
99+
outFrameType = FrameType.RESPONSE;
100+
flags |= FLAGS_C;
101+
break;
102+
case NEXT:
103+
outFrameType = FrameType.RESPONSE;
104+
break;
105+
default:
106+
outFrameType = type;
107+
break;
108+
}
109+
55110
frameBuffer.putByte(VERSION_FIELD_OFFSET, CURRENT_VERSION);
111+
frameBuffer.putByte(FLAGS_FIELD_OFFSET, (byte) flags);
112+
frameBuffer.putShort(TYPE_FIELD_OFFSET, (short)outFrameType.getEncodedType(), ByteOrder.BIG_ENDIAN);
56113
frameBuffer.putLong(STREAM_ID_FIELD_OFFSET, streamId, ByteOrder.BIG_ENDIAN);
57-
frameBuffer.putInt(TYPE_FIELD_OFFSET, type.getMessageId(), ByteOrder.BIG_ENDIAN);
58-
frameBuffer.putInt(DATA_LENGTH_OFFSET, data.length, ByteOrder.BIG_ENDIAN);
59114
frameBuffer.putBytes(DATA_OFFSET, data);
115+
116+
return frameLength;
60117
}
61118

62-
public void decode(Frame frame, final ByteBuffer byteBuffer)
119+
public void decode(Frame frame, final ByteBuffer byteBuffer, final int length)
63120
{
64121
frameBuffer.wrap(byteBuffer);
65122

123+
int frameLength = length;
124+
125+
if (INCLUDE_FRAME_LENGTH)
126+
{
127+
frameLength = frameBuffer.getInt(FRAME_LENGTH_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
128+
}
129+
66130
final int version = frameBuffer.getByte(VERSION_FIELD_OFFSET);
131+
final int flags = frameBuffer.getByte(FLAGS_FIELD_OFFSET);
132+
FrameType frameType = FrameType.from(frameBuffer.getShort(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
133+
67134
final long streamId = frameBuffer.getLong(STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
68-
final FrameType frameType = FrameType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
69-
final int dataLength = frameBuffer.getInt(DATA_LENGTH_OFFSET, ByteOrder.BIG_ENDIAN);
135+
136+
final int dataLength = frameLength - DATA_OFFSET;
137+
int dataOffset = DATA_OFFSET;
138+
139+
switch (frameType)
140+
{
141+
case RESPONSE:
142+
if (FLAGS_C == (flags & FLAGS_C))
143+
{
144+
frameType = FrameType.COMPLETE;
145+
}
146+
else
147+
{
148+
frameType = FrameType.NEXT;
149+
}
150+
break;
151+
152+
case REQUEST_N:
153+
// TODO: grab N value
154+
break;
155+
case REQUEST_STREAM:
156+
// TODO: grab N value, and move DATA_OFFSET value
157+
break;
158+
case REQUEST_SUBSCRIPTION:
159+
// TODO: grab N value, and move DATA_OFFSET value
160+
break;
161+
}
70162

71163
// fill in Frame fields
72-
frame.setFromDecode(version, streamId, frameType);
73-
frame.setFromDecode(frameBuffer, DATA_OFFSET, dataLength);
164+
frame.setFromDecode(version, streamId, frameType, flags);
165+
frame.setFromDecode(frameBuffer, dataOffset, dataLength);
74166
}
75167
}

src/main/java/io/reactivesocket/FrameType.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,26 @@ public enum FrameType
2929
REQUEST_N(0x15),
3030
CANCEL(0x16),
3131
// Messages from Responder
32+
RESPONSE(0x20),
33+
ERROR(0x21),
34+
// synthetic types from Responder for use by the rest of the machinery
3235
NEXT(0x22),
33-
COMPLETE(0x23),
34-
ERROR(0x24);
35-
36+
COMPLETE(0x23);
37+
3638
private static FrameType[] typesById;
3739

3840
/**
3941
* Index types by id for indexed lookup.
4042
*/
4143
static {
4244
int max = 0;
45+
4346
for (FrameType t : values()) {
44-
if (t.id > max) {
45-
max = t.id;
46-
}
47+
max = Math.max(t.id, max);
4748
}
49+
4850
typesById = new FrameType[max + 1];
51+
4952
for (FrameType t : values()) {
5053
typesById[t.id] = t;
5154
}
@@ -57,7 +60,7 @@ public enum FrameType
5760
this.id = id;
5861
}
5962

60-
public int getMessageId() {
63+
public int getEncodedType() {
6164
return id;
6265
}
6366

0 commit comments

Comments
 (0)