Skip to content

Commit e1a1d04

Browse files
committed
reduce allocations in Frame
1 parent 66e1662 commit e1a1d04

File tree

4 files changed

+138
-123
lines changed

4 files changed

+138
-123
lines changed

src/main/java/io/reactivesocket/Frame.java

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package io.reactivesocket;
1717

18+
import uk.co.real_logic.agrona.DirectBuffer;
19+
1820
import java.nio.ByteBuffer;
1921

2022
/**
@@ -24,41 +26,54 @@
2426
*/
2527
public class Frame
2628
{
29+
private static final int INITIAL_MESSAGE_ARRAY_SIZE = 256;
2730
// TODO: make thread local to demonstrate idea
2831
private static final ThreadLocal<FrameFlyweight> FRAME_HANDLER = ThreadLocal.withInitial(FrameFlyweight::new);
2932

3033
private Frame() {
3134
}
3235

3336
// not final so we can reuse this object
34-
private ByteBuffer b;
35-
private int streamId;
36-
private FrameType type;
37-
private String message;
37+
private ByteBuffer byteBuffer;
38+
private byte[] messageArray = new byte[INITIAL_MESSAGE_ARRAY_SIZE];
39+
private FrameType frameType;
40+
41+
private long streamId;
42+
private int version;
43+
private int messageLength = 0;
3844

39-
public ByteBuffer getBytes() {
40-
return b;
45+
public ByteBuffer getByteBuffer() {
46+
return byteBuffer;
4147
}
4248

4349
public String getMessage() {
44-
if (type == null) {
50+
if (frameType == null) {
4551
decode();
4652
}
47-
return message;
53+
return new String(messageArray, 0, messageLength);
4854
}
4955

50-
public int getStreamId() {
51-
if (type == null) {
56+
public long getStreamId() {
57+
if (frameType == null) {
5258
decode();
5359
}
5460
return streamId;
5561
}
5662

5763
public FrameType getMessageType() {
58-
if (type == null) {
64+
if (frameType == null) {
65+
decode();
66+
}
67+
return frameType;
68+
}
69+
70+
public int getVersion()
71+
{
72+
if (frameType == null)
73+
{
5974
decode();
6075
}
61-
return type;
76+
return version;
6277
}
6378

6479
/**
@@ -68,9 +83,8 @@ public FrameType getMessageType() {
6883
*/
6984
public void wrap(ByteBuffer b) {
7085
this.streamId = -1;
71-
this.type = null;
72-
this.message = null;
73-
this.b = b;
86+
this.frameType = null;
87+
this.byteBuffer = b;
7488
}
7589

7690
/**
@@ -81,7 +95,7 @@ public void wrap(ByteBuffer b) {
8195
*/
8296
public static Frame from(ByteBuffer b) {
8397
Frame f = new Frame();
84-
f.b = b;
98+
f.byteBuffer = b;
8599
return f;
86100
}
87101

@@ -92,11 +106,31 @@ public static Frame from(ByteBuffer b) {
92106
* @param type
93107
* @param message
94108
*/
95-
public void wrap(int streamId, FrameType type, String message) {
109+
public void wrap(long streamId, FrameType type, String message) {
96110
this.streamId = streamId;
97-
this.type = type;
98-
this.message = message;
99-
this.b = getBytes(streamId, type, message);
111+
this.frameType = type;
112+
113+
final byte[] messageBytes = message.getBytes();
114+
ensureMessageArrayCapacity(messageBytes.length);
115+
116+
System.arraycopy(messageBytes, 0, this.messageArray, 0, messageBytes.length);
117+
this.messageLength = messageBytes.length;
118+
119+
this.byteBuffer = createByteBufferAndEncode(streamId, type, messageBytes);
120+
}
121+
122+
public void setFromDecode(final int version, final long streamId, final FrameType type)
123+
{
124+
this.version = version;
125+
this.streamId = streamId;
126+
this.frameType = type;
127+
}
128+
129+
public void setFromDecode(final DirectBuffer buffer, final int offset, final int messageLength)
130+
{
131+
ensureMessageArrayCapacity(messageLength);
132+
this.messageLength = messageLength;
133+
buffer.getBytes(offset, this.messageArray, 0, messageLength);
100134
}
101135

102136
/**
@@ -107,47 +141,54 @@ public void wrap(int streamId, FrameType type, String message) {
107141
* @param message
108142
* @return
109143
*/
110-
public static Frame from(int streamId, FrameType type, String message) {
144+
public static Frame from(long streamId, FrameType type, String message) {
111145
Frame f = new Frame();
112-
f.b = getBytes(streamId, type, message);
113146
f.streamId = streamId;
114-
f.type = type;
115-
f.message = message;
147+
f.frameType = type;
148+
149+
final byte[] messageBytes = message.getBytes();
150+
f.ensureMessageArrayCapacity(messageBytes.length);
151+
152+
f.byteBuffer = createByteBufferAndEncode(streamId, type, messageBytes);
153+
154+
System.arraycopy(messageBytes, 0, f.messageArray, 0, messageBytes.length);
155+
f.messageLength = messageBytes.length;
116156
return f;
117157
}
118158

119-
private static ByteBuffer getBytes(int messageId, FrameType type, String message) {
159+
private static ByteBuffer createByteBufferAndEncode(long streamId, FrameType type, final byte[] message) {
120160
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
121161

122162
// TODO: allocation side effect of how this works currently with the rest of the machinery.
123-
final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.frameLength(message.length()));
163+
final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.frameLength(message.length));
124164

125-
frameFlyweight.encode(buffer, messageId, type, message.getBytes());
165+
frameFlyweight.encode(buffer, streamId, type, message);
126166
return buffer;
127167
}
128168

129169
private void decode() {
130170
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
131171

132-
frameFlyweight.decode(b);
133-
this.type = frameFlyweight.messageType();
134-
this.streamId = (int) frameFlyweight.streamId(); // TODO: temp cast to only touch as little as possible
172+
frameFlyweight.decode(this, byteBuffer);
173+
}
135174

136-
// TODO: temp allocation to touch as little as possible
137-
final byte[] data = new byte[frameFlyweight.dataLength()];
138-
frameFlyweight.getDataBytes(data);
139-
this.message = new String(data);
175+
private void ensureMessageArrayCapacity(final int length)
176+
{
177+
if (messageArray.length < length)
178+
{
179+
messageArray = new byte[length];
180+
}
140181
}
141182

142183
@Override
143184
public String toString() {
144-
if (type == null) {
185+
if (frameType == null) {
145186
try {
146187
decode();
147188
} catch (Exception e) {
148189
e.printStackTrace();
149190
}
150191
}
151-
return "Frame => ID: " + streamId + " Type: " + type + " Data: " + message;
192+
return "Frame => ID: " + streamId + " Type: " + frameType + " Data: " + getMessage();
152193
}
153194
}

src/main/java/io/reactivesocket/FrameFlyweight.java

Lines changed: 8 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,6 @@ public class FrameFlyweight
4444
// single threaded assumed
4545
private final MutableDirectBuffer frameBuffer = new UnsafeBuffer(EMPTY_BUFFER);
4646

47-
// set by decode
48-
private long streamId;
49-
private FrameType frameType;
50-
private byte version;
51-
private int dataLength;
52-
5347
public static int frameLength(final int dataLength)
5448
{
5549
return DATA_OFFSET + dataLength;
@@ -65,47 +59,17 @@ public void encode(final ByteBuffer byteBuffer, final long streamId, final Frame
6559
frameBuffer.putBytes(DATA_OFFSET, data);
6660
}
6761

68-
/**
69-
* populate streamId, type, dataLength, etc.
70-
*/
71-
public void decode(final ByteBuffer byteBuffer)
62+
public void decode(Frame frame, final ByteBuffer byteBuffer)
7263
{
7364
frameBuffer.wrap(byteBuffer);
7465

75-
version = frameBuffer.getByte(VERSION_FIELD_OFFSET);
76-
streamId = frameBuffer.getLong(STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
77-
frameType = FrameType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
78-
dataLength = frameBuffer.getInt(DATA_LENGTH_OFFSET, ByteOrder.BIG_ENDIAN);
79-
}
66+
final int version = frameBuffer.getByte(VERSION_FIELD_OFFSET);
67+
final long streamId = frameBuffer.getLong(STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
68+
final FrameType frameType = FrameType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
69+
final int dataLength = frameBuffer.getInt(DATA_LENGTH_OFFSET, ByteOrder.BIG_ENDIAN);
8070

81-
public ByteBuffer buffer()
82-
{
83-
return frameBuffer.byteBuffer();
84-
}
85-
86-
public byte version()
87-
{
88-
return version;
89-
}
90-
91-
public FrameType messageType()
92-
{
93-
return frameType;
71+
// fill in Frame fields
72+
frame.setFromDecode(version, streamId, frameType);
73+
frame.setFromDecode(frameBuffer, DATA_OFFSET, dataLength);
9474
}
95-
96-
public long streamId()
97-
{
98-
return streamId;
99-
}
100-
101-
public int dataLength()
102-
{
103-
return dataLength;
104-
}
105-
106-
public void getDataBytes(final byte[] array)
107-
{
108-
frameBuffer.getBytes(DATA_OFFSET, array);
109-
}
110-
11175
}

src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ public Publisher<Void> acceptConnection(DuplexConnection ws) {
5555
// TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter
5656

5757
/* state of cancellation subjects during connection */
58-
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();
58+
final ConcurrentHashMap<Long, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();
5959
/* streams in flight that can receive REQUEST_N messages */
60-
final ConcurrentHashMap<Integer, RequestOperator<?>> inFlight = new ConcurrentHashMap<>();
60+
final ConcurrentHashMap<Long, RequestOperator<?>> inFlight = new ConcurrentHashMap<>();
6161

6262
return toPublisher(toObservable(ws.getInput()).flatMap(message -> {
6363
if (message.getMessageType() == FrameType.REQUEST_RESPONSE) {
@@ -90,8 +90,8 @@ public Publisher<Void> acceptConnection(DuplexConnection ws) {
9090
* TODO explore if there is a better way of doing this while only exposing Publisher APIs
9191
*/
9292

93-
private Observable<Void> handleRequestResponse(DuplexConnection ws, Frame requestFrame, final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables) {
94-
int streamId = requestFrame.getStreamId();
93+
private Observable<Void> handleRequestResponse(DuplexConnection ws, Frame requestFrame, final ConcurrentHashMap<Long, CancellationToken> cancellationObservables) {
94+
long streamId = requestFrame.getStreamId();
9595
CancellationToken cancellationToken = CancellationToken.create();
9696
cancellationObservables.put(requestFrame.getStreamId(), cancellationToken);
9797

@@ -109,14 +109,24 @@ private Observable<Void> handleRequestResponse(DuplexConnection ws, Frame reques
109109
.finallyDo(() -> cancellationObservables.remove(streamId)))));
110110
}
111111

112-
private Observable<Void> handleRequestStream(DuplexConnection ws, Frame frame, final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables, ConcurrentHashMap<Integer, RequestOperator<?>> inflight) {
112+
private Observable<Void> handleRequestStream(
113+
DuplexConnection ws,
114+
Frame frame,
115+
final ConcurrentHashMap<Long, CancellationToken> cancellationObservables,
116+
ConcurrentHashMap<Long, RequestOperator<?>> inflight)
117+
{
113118
return handleStream(ws, frame,
114119
requestHandler::handleRequestStream,
115120
cancellationObservables, inflight,
116121
() -> just(Frame.from(frame.getStreamId(), FrameType.COMPLETE, "")));
117122
}
118123

119-
private Observable<Void> handleRequestSubscription(DuplexConnection ws, Frame frame, final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables, ConcurrentHashMap<Integer, RequestOperator<?>> inflight) {
124+
private Observable<Void> handleRequestSubscription(
125+
DuplexConnection ws,
126+
Frame frame,
127+
final ConcurrentHashMap<Long, CancellationToken> cancellationObservables,
128+
ConcurrentHashMap<Long, RequestOperator<?>> inflight)
129+
{
120130
return handleStream(ws, frame,
121131
requestHandler::handleRequestSubscription,
122132
cancellationObservables, inflight,
@@ -138,11 +148,11 @@ private Observable<Void> handleStream(
138148
DuplexConnection ws,
139149
Frame frame,
140150
Func1<String, Publisher<String>> messageHandler,
141-
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables,
142-
ConcurrentHashMap<Integer, RequestOperator<?>> inflight,
143-
Func0<? extends Observable<Frame>> onCompletedHandler) {
144-
145-
int streamId = frame.getStreamId();
151+
final ConcurrentHashMap<Long, CancellationToken> cancellationObservables,
152+
ConcurrentHashMap<Long, RequestOperator<?>> inflight,
153+
Func0<? extends Observable<Frame>> onCompletedHandler)
154+
{
155+
long streamId = frame.getStreamId();
146156
CancellationToken cancellationToken = CancellationToken.create();
147157
cancellationObservables.put(streamId, cancellationToken);
148158

@@ -180,7 +190,7 @@ private Observable<Void> handleFireAndForget(Frame requestFrame) {
180190
});
181191
}
182192

183-
private Observable<? extends Void> handleCancellationRequest(final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables, Frame frame) {
193+
private Observable<? extends Void> handleCancellationRequest(final ConcurrentHashMap<Long, CancellationToken> cancellationObservables, Frame frame) {
184194
CancellationToken cancellationToken = cancellationObservables.get(frame.getStreamId());
185195
if (cancellationToken != null) {
186196
cancellationToken.cancel();
@@ -189,7 +199,7 @@ private Observable<? extends Void> handleCancellationRequest(final ConcurrentHas
189199
}
190200

191201
// TODO this needs further thought ... very prototypish implementation right now
192-
private Observable<? extends Void> handleRequestN(Frame frame, final ConcurrentHashMap<Integer, RequestOperator<?>> inFlight) {
202+
private Observable<? extends Void> handleRequestN(Frame frame, final ConcurrentHashMap<Long, RequestOperator<?>> inFlight) {
193203
RequestOperator<?> requestor = inFlight.get(frame.getStreamId());
194204
// TODO commented out as this isn't working yet
195205
// System.out.println("*** requestN " + requestor);

0 commit comments

Comments
 (0)