|
15 | 15 | */
|
16 | 16 | package io.reactivesocket;
|
17 | 17 |
|
18 |
| -import uk.co.real_logic.agrona.DirectBuffer; |
19 |
| - |
20 | 18 | import java.nio.ByteBuffer;
|
21 | 19 |
|
22 | 20 | /**
|
|
26 | 24 | */
|
27 | 25 | public class Frame
|
28 | 26 | {
|
29 |
| - private static final int INITIAL_MESSAGE_ARRAY_SIZE = 256; |
30 |
| - // TODO: make thread local to demonstrate idea |
| 27 | + // thread local as it needs to be single-threaded access |
31 | 28 | private static final ThreadLocal<FrameFlyweight> FRAME_HANDLER = ThreadLocal.withInitial(FrameFlyweight::new);
|
32 | 29 |
|
33 |
| - private Frame() { |
34 |
| - } |
35 |
| - |
36 | 30 | // not final so we can reuse this object
|
37 | 31 | private ByteBuffer byteBuffer;
|
38 |
| - private byte[] messageArray = new byte[INITIAL_MESSAGE_ARRAY_SIZE]; |
39 |
| - private FrameType frameType; |
40 | 32 |
|
41 |
| - private long streamId; |
42 |
| - private int version; |
43 |
| - private int flags; |
44 |
| - private int messageLength = 0; |
| 33 | + private Frame() { |
| 34 | + } |
45 | 35 |
|
| 36 | + /** |
| 37 | + * Return underlying {@link ByteBuffer} for frame |
| 38 | + * |
| 39 | + * @return underlying {@link ByteBuffer} for frame |
| 40 | + */ |
46 | 41 | public ByteBuffer getByteBuffer() {
|
47 | 42 | return byteBuffer;
|
48 | 43 | }
|
49 | 44 |
|
50 |
| - public String getMessage() { |
51 |
| - if (frameType == null) { |
52 |
| - decode(); |
53 |
| - } |
54 |
| - return new String(messageArray, 0, messageLength); |
| 45 | + /** |
| 46 | + * Return frame data as a String |
| 47 | + * |
| 48 | + * @return frame data as {@link System} |
| 49 | + */ |
| 50 | + public String getData() { |
| 51 | + final FrameFlyweight frameFlyweight = FRAME_HANDLER.get(); |
| 52 | + |
| 53 | + return frameFlyweight.framePayload(byteBuffer, 0); |
55 | 54 | }
|
56 | 55 |
|
| 56 | + /** |
| 57 | + * Return frame stream identifier |
| 58 | + * |
| 59 | + * @return frame stream identifier |
| 60 | + */ |
57 | 61 | public long getStreamId() {
|
58 |
| - if (frameType == null) { |
59 |
| - decode(); |
60 |
| - } |
61 |
| - return streamId; |
| 62 | + final FrameFlyweight frameFlyweight = FRAME_HANDLER.get(); |
| 63 | + |
| 64 | + return frameFlyweight.streamId(byteBuffer); |
62 | 65 | }
|
63 | 66 |
|
64 |
| - public FrameType getMessageType() { |
65 |
| - if (frameType == null) { |
66 |
| - decode(); |
67 |
| - } |
68 |
| - return frameType; |
| 67 | + /** |
| 68 | + * Return frame {@link FrameType} |
| 69 | + * |
| 70 | + * @return frame type |
| 71 | + */ |
| 72 | + public FrameType getType() { |
| 73 | + final FrameFlyweight frameFlyweight = FRAME_HANDLER.get(); |
| 74 | + |
| 75 | + return frameFlyweight.frameType(byteBuffer); |
69 | 76 | }
|
70 | 77 |
|
| 78 | + /** |
| 79 | + * Return frame version |
| 80 | + * |
| 81 | + * @return frame version |
| 82 | + */ |
71 | 83 | public int getVersion()
|
72 | 84 | {
|
73 |
| - if (frameType == null) |
74 |
| - { |
75 |
| - decode(); |
76 |
| - } |
77 |
| - return version; |
| 85 | + final FrameFlyweight frameFlyweight = FRAME_HANDLER.get(); |
| 86 | + |
| 87 | + return frameFlyweight.version(byteBuffer); |
78 | 88 | }
|
79 | 89 |
|
| 90 | + /** |
| 91 | + * Return frame flags |
| 92 | + * |
| 93 | + * @return frame flags |
| 94 | + */ |
80 | 95 | public int getFlags()
|
81 | 96 | {
|
82 |
| - if (frameType == null) |
83 |
| - { |
84 |
| - decode(); |
85 |
| - } |
86 |
| - return flags; |
| 97 | + final FrameFlyweight frameFlyweight = FRAME_HANDLER.get(); |
| 98 | + |
| 99 | + return frameFlyweight.flags(byteBuffer); |
87 | 100 | }
|
88 | 101 |
|
89 | 102 | /**
|
90 | 103 | * Mutates this Frame to contain the given ByteBuffer
|
91 | 104 | *
|
92 |
| - * @param b |
| 105 | + * @param byteBuffer to wrap |
93 | 106 | */
|
94 |
| - public void wrap(final ByteBuffer b) { |
95 |
| - this.streamId = -1; |
96 |
| - this.frameType = null; |
97 |
| - this.byteBuffer = b; |
| 107 | + public void wrap(final ByteBuffer byteBuffer) { |
| 108 | + this.byteBuffer = byteBuffer; |
98 | 109 | }
|
99 | 110 |
|
100 | 111 | /**
|
101 | 112 | * Construct a new Frame from the given ByteBuffer
|
102 | 113 | *
|
103 |
| - * @param b |
104 |
| - * @return |
| 114 | + * @param byteBuffer to wrap |
| 115 | + * @return new {@link Frame} |
105 | 116 | */
|
106 |
| - public static Frame from(final ByteBuffer b) { |
| 117 | + public static Frame from(final ByteBuffer byteBuffer) { |
107 | 118 | Frame f = new Frame();
|
108 |
| - f.byteBuffer = b; |
| 119 | + f.byteBuffer = byteBuffer; |
109 | 120 | return f;
|
110 | 121 | }
|
111 | 122 |
|
112 | 123 | /**
|
113 |
| - * Mutates this Frame to contain the given message. |
114 |
| - * |
115 |
| - * @param streamId |
116 |
| - * @param type |
117 |
| - * @param message |
| 124 | + * Mutates this Frame to contain the given parameters. |
| 125 | + * |
| 126 | + * NOTE: allocates a new {@link ByteBuffer} |
| 127 | + * |
| 128 | + * @param streamId to include in frame |
| 129 | + * @param type to include in frame |
| 130 | + * @param message to include in frame |
118 | 131 | */
|
119 | 132 | public void wrap(final long streamId, final FrameType type, final String message) {
|
120 |
| - this.streamId = streamId; |
121 |
| - this.frameType = type; |
122 |
| - |
123 |
| - final byte[] messageBytes = message.getBytes(); |
124 |
| - ensureMessageArrayCapacity(messageBytes.length); |
125 |
| - |
126 |
| - System.arraycopy(messageBytes, 0, this.messageArray, 0, messageBytes.length); |
127 |
| - this.messageLength = messageBytes.length; |
128 |
| - |
129 |
| - this.byteBuffer = createByteBufferAndEncode(streamId, type, messageBytes); |
130 |
| - } |
131 |
| - |
132 |
| - public void setFromDecode(final int version, final long streamId, final FrameType type, final int flags) |
133 |
| - { |
134 |
| - this.version = version; |
135 |
| - this.streamId = streamId; |
136 |
| - this.frameType = type; |
137 |
| - this.flags = flags; |
138 |
| - } |
139 |
| - |
140 |
| - public void setFromDecode(final DirectBuffer buffer, final int offset, final int messageLength) |
141 |
| - { |
142 |
| - ensureMessageArrayCapacity(messageLength); |
143 |
| - this.messageLength = messageLength; |
144 |
| - buffer.getBytes(offset, this.messageArray, 0, messageLength); |
| 133 | + this.byteBuffer = createByteBufferAndEncode(streamId, type, message); |
145 | 134 | }
|
146 | 135 |
|
147 | 136 | /**
|
148 |
| - * Construct a new Frame with the given message. |
| 137 | + * Construct a new Frame with the given parameters. |
149 | 138 | *
|
150 |
| - * @param streamId |
151 |
| - * @param type |
152 |
| - * @param message |
153 |
| - * @return |
| 139 | + * @param streamId to include in frame |
| 140 | + * @param type to include in frame |
| 141 | + * @param message to include in frame |
| 142 | + * @return new {@link Frame} |
154 | 143 | */
|
155 | 144 | public static Frame from(long streamId, FrameType type, String message) {
|
156 | 145 | Frame f = new Frame();
|
157 |
| - f.streamId = streamId; |
158 |
| - f.frameType = type; |
159 |
| - |
160 |
| - final byte[] messageBytes = message.getBytes(); |
161 |
| - f.ensureMessageArrayCapacity(messageBytes.length); |
162 |
| - |
163 |
| - f.byteBuffer = createByteBufferAndEncode(streamId, type, messageBytes); |
164 |
| - |
165 |
| - System.arraycopy(messageBytes, 0, f.messageArray, 0, messageBytes.length); |
166 |
| - f.messageLength = messageBytes.length; |
| 146 | + f.byteBuffer = createByteBufferAndEncode(streamId, type, message); |
167 | 147 | return f;
|
168 | 148 | }
|
169 | 149 |
|
170 |
| - private static ByteBuffer createByteBufferAndEncode(long streamId, FrameType type, final byte[] message) { |
| 150 | + private static ByteBuffer createByteBufferAndEncode(long streamId, FrameType type, final String message) { |
171 | 151 | final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
|
172 | 152 |
|
173 | 153 | // TODO: allocation side effect of how this works currently with the rest of the machinery.
|
174 |
| - final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.computeFrameLength(message.length)); |
| 154 | + final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.computeFrameLength(message.length())); |
175 | 155 |
|
176 | 156 | frameFlyweight.encode(buffer, streamId, type, message);
|
177 | 157 | return buffer;
|
178 | 158 | }
|
179 | 159 |
|
180 |
| - private void decode() { |
| 160 | + @Override |
| 161 | + public String toString() { |
181 | 162 | final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
|
| 163 | + FrameType type = FrameType.SETUP; |
| 164 | + String payload = ""; |
| 165 | + long streamId = -1; |
182 | 166 |
|
183 |
| - frameFlyweight.decode(this, byteBuffer, 0); |
184 |
| - } |
185 |
| - |
186 |
| - private void ensureMessageArrayCapacity(final int length) |
187 |
| - { |
188 |
| - if (messageArray.length < length) |
| 167 | + try |
189 | 168 | {
|
190 |
| - messageArray = new byte[length]; |
191 |
| - } |
192 |
| - } |
193 |
| - |
194 |
| - @Override |
195 |
| - public String toString() { |
196 |
| - if (frameType == null) { |
197 |
| - try { |
198 |
| - decode(); |
199 |
| - } catch (Exception e) { |
200 |
| - e.printStackTrace(); |
201 |
| - } |
| 169 | + type = frameFlyweight.frameType(byteBuffer); |
| 170 | + payload = frameFlyweight.framePayload(byteBuffer, 0); |
| 171 | + streamId = frameFlyweight.streamId(byteBuffer); |
| 172 | + } catch (Exception e) { |
| 173 | + e.printStackTrace(); |
202 | 174 | }
|
203 |
| - return "Frame => ID: " + streamId + " Type: " + frameType + " Data: " + getMessage(); |
| 175 | + return "Frame => ID: " + streamId + " Type: " + type + " Data: " + payload; |
204 | 176 | }
|
205 | 177 | }
|
0 commit comments