Skip to content

Commit bf38734

Browse files
committed
exploratory binary encoding.
1 parent f11cf66 commit bf38734

File tree

3 files changed

+144
-11
lines changed

3 files changed

+144
-11
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import uk.co.real_logic.agrona.BitUtil;
19+
import uk.co.real_logic.agrona.MutableDirectBuffer;
20+
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
21+
22+
import java.nio.ByteBuffer;
23+
import java.nio.ByteOrder;
24+
25+
/**
26+
* Per connection frame handling.
27+
* Holds codecs and DirectBuffers for wrapping
28+
*/
29+
public class FrameHandler
30+
{
31+
/**
32+
* Not the real frame layout, just an iteration on the ASCII version
33+
*/
34+
private static final int VERSION_FIELD_OFFSET = 0;
35+
private static final int STREAM_ID_FIELD_OFFSET = VERSION_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
36+
private static final int TYPE_FIELD_OFFSET = STREAM_ID_FIELD_OFFSET + BitUtil.SIZE_OF_LONG;
37+
private static final int DATA_LENGTH_OFFSET = TYPE_FIELD_OFFSET + BitUtil.SIZE_OF_INT;
38+
private static final int DATA_OFFSET = DATA_LENGTH_OFFSET + BitUtil.SIZE_OF_INT;
39+
40+
private static final byte CURRENT_VERSION = 0;
41+
42+
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
43+
44+
// single threaded assumed
45+
private final MutableDirectBuffer frameBuffer = new UnsafeBuffer(EMPTY_BUFFER);
46+
47+
// set by decode
48+
private long streamId;
49+
private MessageType messageType;
50+
private byte version;
51+
private int dataLength;
52+
53+
public static int frameLength(final int dataLength)
54+
{
55+
return DATA_OFFSET + dataLength;
56+
}
57+
58+
public void encode(final ByteBuffer byteBuffer, final long streamId, final MessageType type, final byte[] data)
59+
{
60+
frameBuffer.wrap(byteBuffer);
61+
frameBuffer.putByte(VERSION_FIELD_OFFSET, CURRENT_VERSION);
62+
frameBuffer.putLong(STREAM_ID_FIELD_OFFSET, streamId, ByteOrder.BIG_ENDIAN);
63+
frameBuffer.putInt(TYPE_FIELD_OFFSET, type.getMessageId(), ByteOrder.BIG_ENDIAN);
64+
frameBuffer.putInt(DATA_LENGTH_OFFSET, data.length, ByteOrder.BIG_ENDIAN);
65+
frameBuffer.putBytes(DATA_OFFSET, data);
66+
}
67+
68+
/**
69+
* populate streamId, type, dataLength, etc.
70+
*/
71+
public void decode(final ByteBuffer byteBuffer)
72+
{
73+
frameBuffer.wrap(byteBuffer);
74+
75+
version = frameBuffer.getByte(VERSION_FIELD_OFFSET);
76+
streamId = frameBuffer.getLong(STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
77+
messageType = MessageType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
78+
dataLength = frameBuffer.getInt(DATA_LENGTH_OFFSET, ByteOrder.BIG_ENDIAN);
79+
}
80+
81+
public ByteBuffer buffer()
82+
{
83+
return frameBuffer.byteBuffer();
84+
}
85+
86+
public byte version()
87+
{
88+
return version;
89+
}
90+
91+
public MessageType messageType()
92+
{
93+
return messageType;
94+
}
95+
96+
public long streamId()
97+
{
98+
return streamId;
99+
}
100+
101+
public int dataLength()
102+
{
103+
return dataLength;
104+
}
105+
106+
public void getDataBytes(final byte[] array)
107+
{
108+
frameBuffer.getBytes(DATA_OFFSET, array);
109+
}
110+
111+
}

src/main/java/io/reactivesocket/Message.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
*/
2525
public class Message {
2626

27+
// TODO: make thread local to demonstrate idea
28+
private static final ThreadLocal<FrameHandler> FRAME_HANDLER = ThreadLocal.withInitial(FrameHandler::new);
29+
2730
private Message() {
2831
}
2932

@@ -118,9 +121,17 @@ private static ByteBuffer getBytes(int messageId, MessageType type, String messa
118121
/**
119122
* This is NOT how we want it for real. Just representing the idea for discussion.
120123
*/
121-
String s = "[" + type.getMessageId() + "]" + getIdString(messageId) + message;
124+
// String s = "[" + type.getMessageId() + "]" + getIdString(messageId) + message;
122125
// TODO stop allocating ... use flywheels
123-
return ByteBuffer.wrap(s.getBytes());
126+
// return ByteBuffer.wrap(s.getBytes());
127+
128+
final FrameHandler frameHandler = FRAME_HANDLER.get();
129+
130+
// TODO: allocation side effect of how this works currently with the rest of the machinery.
131+
final ByteBuffer buffer = ByteBuffer.allocate(FrameHandler.frameLength(message.length()));
132+
133+
frameHandler.encode(buffer, messageId, type, message.getBytes());
134+
return buffer;
124135
}
125136

126137
private static String getIdString(int id) {
@@ -132,14 +143,25 @@ private void decode() {
132143
/**
133144
* This is NOT how we want it for real. Just representing the idea for discussion.
134145
*/
135-
byte[] copy = new byte[b.limit()];
136-
b.get(copy);
137-
String data = new String(copy);
138-
int separator = data.indexOf('|');
139-
String prefix = data.substring(0, separator);
140-
this.type = MessageType.from(Integer.parseInt(prefix.substring(1, data.indexOf(']'))));
141-
this.streamId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
142-
this.message = data.substring(separator + 1, data.length());
146+
// byte[] copy = new byte[b.limit()];
147+
// b.get(copy);
148+
// String data = new String(copy);
149+
// int separator = data.indexOf('|');
150+
// String prefix = data.substring(0, separator);
151+
// this.type = MessageType.from(Integer.parseInt(prefix.substring(1, data.indexOf(']'))));
152+
// this.streamId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
153+
// this.message = data.substring(separator + 1, data.length());
154+
155+
final FrameHandler frameHandler = FRAME_HANDLER.get();
156+
157+
frameHandler.decode(b);
158+
this.type = frameHandler.messageType();
159+
this.streamId = (int)frameHandler.streamId(); // TODO: temp cast to only touch as little as possible
160+
161+
// TODO: temp allocation to touch as little as possible
162+
final byte[] data = new byte[frameHandler.dataLength()];
163+
frameHandler.getDataBytes(data);
164+
this.message = new String(data);
143165
}
144166

145167
@Override

src/main/java/io/reactivesocket/MessageType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*/
2121
public enum MessageType {
2222

23-
SETUP(0x01),
23+
SETUP(0x01),
2424
// Messages from Requestor
2525
REQUEST_RESPONSE(0x11),
2626
FIRE_AND_FORGET(0x12),

0 commit comments

Comments
 (0)