Skip to content

Commit 66e1662

Browse files
committed
naming change for clarity
1 parent bf38734 commit 66e1662

10 files changed

+231
-256
lines changed

src/main/java/io/reactivesocket/DuplexConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
public interface DuplexConnection {
2424
// TODO should we call this 'Connection'? 'SocketConnection'? 'ReactiveSocketConnection'?
2525

26-
Publisher<Message> getInput();
26+
Publisher<Frame> getInput();
2727

28-
Publisher<Void> write(Publisher<Message> o);
28+
Publisher<Void> write(Publisher<Frame> o);
2929

3030
}

src/main/java/io/reactivesocket/Message.java renamed to src/main/java/io/reactivesocket/Frame.java

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@
1818
import java.nio.ByteBuffer;
1919

2020
/**
21-
* Represents a Message sent over a {@link DuplexConnection}.
21+
* Represents a Frame sent over a {@link DuplexConnection}.
2222
* <p>
2323
* This provides encoding, decoding and field accessors.
2424
*/
25-
public class Message {
26-
25+
public class Frame
26+
{
2727
// TODO: make thread local to demonstrate idea
28-
private static final ThreadLocal<FrameHandler> FRAME_HANDLER = ThreadLocal.withInitial(FrameHandler::new);
28+
private static final ThreadLocal<FrameFlyweight> FRAME_HANDLER = ThreadLocal.withInitial(FrameFlyweight::new);
2929

30-
private Message() {
30+
private Frame() {
3131
}
3232

3333
// not final so we can reuse this object
3434
private ByteBuffer b;
3535
private int streamId;
36-
private MessageType type;
36+
private FrameType type;
3737
private String message;
3838

3939
public ByteBuffer getBytes() {
@@ -54,7 +54,7 @@ public int getStreamId() {
5454
return streamId;
5555
}
5656

57-
public MessageType getMessageType() {
57+
public FrameType getMessageType() {
5858
if (type == null) {
5959
decode();
6060
}
@@ -79,8 +79,8 @@ public void wrap(ByteBuffer b) {
7979
* @param b
8080
* @return
8181
*/
82-
public static Message from(ByteBuffer b) {
83-
Message f = new Message();
82+
public static Frame from(ByteBuffer b) {
83+
Frame f = new Frame();
8484
f.b = b;
8585
return f;
8686
}
@@ -92,7 +92,7 @@ public static Message from(ByteBuffer b) {
9292
* @param type
9393
* @param message
9494
*/
95-
public void wrap(int streamId, MessageType type, String message) {
95+
public void wrap(int streamId, FrameType type, String message) {
9696
this.streamId = streamId;
9797
this.type = type;
9898
this.message = message;
@@ -107,60 +107,35 @@ public void wrap(int streamId, MessageType type, String message) {
107107
* @param message
108108
* @return
109109
*/
110-
public static Message from(int streamId, MessageType type, String message) {
111-
Message f = new Message();
110+
public static Frame from(int streamId, FrameType type, String message) {
111+
Frame f = new Frame();
112112
f.b = getBytes(streamId, type, message);
113113
f.streamId = streamId;
114114
f.type = type;
115115
f.message = message;
116116
return f;
117117
}
118118

119-
private static ByteBuffer getBytes(int messageId, MessageType type, String message) {
120-
// TODO replace with binary
121-
/**
122-
* This is NOT how we want it for real. Just representing the idea for discussion.
123-
*/
124-
// String s = "[" + type.getMessageId() + "]" + getIdString(messageId) + message;
125-
// TODO stop allocating ... use flywheels
126-
// return ByteBuffer.wrap(s.getBytes());
127-
128-
final FrameHandler frameHandler = FRAME_HANDLER.get();
119+
private static ByteBuffer getBytes(int messageId, FrameType type, String message) {
120+
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
129121

130122
// TODO: allocation side effect of how this works currently with the rest of the machinery.
131-
final ByteBuffer buffer = ByteBuffer.allocate(FrameHandler.frameLength(message.length()));
123+
final ByteBuffer buffer = ByteBuffer.allocate(FrameFlyweight.frameLength(message.length()));
132124

133-
frameHandler.encode(buffer, messageId, type, message.getBytes());
125+
frameFlyweight.encode(buffer, messageId, type, message.getBytes());
134126
return buffer;
135127
}
136128

137-
private static String getIdString(int id) {
138-
return "[" + id + "]|";
139-
}
140-
141129
private void decode() {
142-
// TODO replace with binary
143-
/**
144-
* This is NOT how we want it for real. Just representing the idea for discussion.
145-
*/
146-
// byte[] copy = new byte[b.limit()];
147-
// b.get(copy);
148-
// String data = new String(copy);
149-
// int separator = data.indexOf('|');
150-
// String prefix = data.substring(0, separator);
151-
// this.type = MessageType.from(Integer.parseInt(prefix.substring(1, data.indexOf(']'))));
152-
// this.streamId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
153-
// this.message = data.substring(separator + 1, data.length());
154-
155-
final FrameHandler frameHandler = FRAME_HANDLER.get();
156-
157-
frameHandler.decode(b);
158-
this.type = frameHandler.messageType();
159-
this.streamId = (int)frameHandler.streamId(); // TODO: temp cast to only touch as little as possible
130+
final FrameFlyweight frameFlyweight = FRAME_HANDLER.get();
131+
132+
frameFlyweight.decode(b);
133+
this.type = frameFlyweight.messageType();
134+
this.streamId = (int) frameFlyweight.streamId(); // TODO: temp cast to only touch as little as possible
160135

161136
// TODO: temp allocation to touch as little as possible
162-
final byte[] data = new byte[frameHandler.dataLength()];
163-
frameHandler.getDataBytes(data);
137+
final byte[] data = new byte[frameFlyweight.dataLength()];
138+
frameFlyweight.getDataBytes(data);
164139
this.message = new String(data);
165140
}
166141

@@ -173,6 +148,6 @@ public String toString() {
173148
e.printStackTrace();
174149
}
175150
}
176-
return "Message => ID: " + streamId + " Type: " + type + " Data: " + message;
151+
return "Frame => ID: " + streamId + " Type: " + type + " Data: " + message;
177152
}
178153
}

src/main/java/io/reactivesocket/FrameHandler.java renamed to src/main/java/io/reactivesocket/FrameFlyweight.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* Per connection frame handling.
2727
* Holds codecs and DirectBuffers for wrapping
2828
*/
29-
public class FrameHandler
29+
public class FrameFlyweight
3030
{
3131
/**
3232
* Not the real frame layout, just an iteration on the ASCII version
@@ -46,7 +46,7 @@ public class FrameHandler
4646

4747
// set by decode
4848
private long streamId;
49-
private MessageType messageType;
49+
private FrameType frameType;
5050
private byte version;
5151
private int dataLength;
5252

@@ -55,7 +55,7 @@ public static int frameLength(final int dataLength)
5555
return DATA_OFFSET + dataLength;
5656
}
5757

58-
public void encode(final ByteBuffer byteBuffer, final long streamId, final MessageType type, final byte[] data)
58+
public void encode(final ByteBuffer byteBuffer, final long streamId, final FrameType type, final byte[] data)
5959
{
6060
frameBuffer.wrap(byteBuffer);
6161
frameBuffer.putByte(VERSION_FIELD_OFFSET, CURRENT_VERSION);
@@ -74,7 +74,7 @@ public void decode(final ByteBuffer byteBuffer)
7474

7575
version = frameBuffer.getByte(VERSION_FIELD_OFFSET);
7676
streamId = frameBuffer.getLong(STREAM_ID_FIELD_OFFSET, ByteOrder.BIG_ENDIAN);
77-
messageType = MessageType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
77+
frameType = FrameType.from(frameBuffer.getInt(TYPE_FIELD_OFFSET, ByteOrder.BIG_ENDIAN));
7878
dataLength = frameBuffer.getInt(DATA_LENGTH_OFFSET, ByteOrder.BIG_ENDIAN);
7979
}
8080

@@ -88,9 +88,9 @@ public byte version()
8888
return version;
8989
}
9090

91-
public MessageType messageType()
91+
public FrameType messageType()
9292
{
93-
return messageType;
93+
return frameType;
9494
}
9595

9696
public long streamId()

src/main/java/io/reactivesocket/MessageType.java renamed to src/main/java/io/reactivesocket/FrameType.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@
1616
package io.reactivesocket;
1717

1818
/**
19-
* Types of {@link Message} that can be sent.
19+
* Types of {@link Frame} that can be sent.
2020
*/
21-
public enum MessageType {
22-
21+
public enum FrameType
22+
{
2323
SETUP(0x01),
2424
// Messages from Requestor
2525
REQUEST_RESPONSE(0x11),
@@ -33,35 +33,35 @@ public enum MessageType {
3333
COMPLETE(0x23),
3434
ERROR(0x24);
3535

36-
private static MessageType[] typesById;
36+
private static FrameType[] typesById;
3737

3838
/**
3939
* Index types by id for indexed lookup.
4040
*/
4141
static {
4242
int max = 0;
43-
for (MessageType t : values()) {
43+
for (FrameType t : values()) {
4444
if (t.id > max) {
4545
max = t.id;
4646
}
4747
}
48-
typesById = new MessageType[max + 1];
49-
for (MessageType t : values()) {
48+
typesById = new FrameType[max + 1];
49+
for (FrameType t : values()) {
5050
typesById[t.id] = t;
5151
}
5252
}
5353

5454
private final int id;
5555

56-
private MessageType(int id) {
56+
FrameType(int id) {
5757
this.id = id;
5858
}
5959

6060
public int getMessageId() {
6161
return id;
6262
}
6363

64-
public static MessageType from(int id) {
64+
public static FrameType from(int id) {
6565
return typesById[id];
6666
}
6767
}

src/main/java/io/reactivesocket/ReactiveSocketClientProtocol.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ public class ReactiveSocketClientProtocol {
4949
* a Map each containing a single Subject for lookup+delivery is better
5050
* for doing demux ... currently chosen multicast+filter
5151
*/
52-
private final Observable<Message> multicastedInputStream;
52+
private final Observable<Frame> multicastedInputStream;
5353
private int streamCount = 0;// 0 is reserved for setup, all normal messages are >= 1
5454

5555
private ReactiveSocketClientProtocol(DuplexConnection connection) {
5656
this.connection = connection;
5757
// multicast input stream to any requestor (with backpressure support via RxJava .publish() operator)
58-
ConnectableObservable<Message> published = toObservable(connection.getInput()).publish();
58+
ConnectableObservable<Frame> published = toObservable(connection.getInput()).publish();
5959
multicastedInputStream = published;
6060
// start listening ... ignoring returned subscription as we don't control connection lifecycle here
6161
published.connect();
@@ -72,7 +72,7 @@ public static ReactiveSocketClientProtocol create(DuplexConnection connection) {
7272
* @return
7373
*/
7474
public Publisher<String> requestResponse(String payload) {
75-
return startStream(Message.from(nextStreamId(), MessageType.REQUEST_RESPONSE, payload));
75+
return startStream(Frame.from(nextStreamId(), FrameType.REQUEST_RESPONSE, payload));
7676
}
7777

7878
/**
@@ -83,7 +83,7 @@ public Publisher<String> requestResponse(String payload) {
8383
* @return
8484
*/
8585
public Publisher<String> requestStream(String payload) {
86-
return startStream(Message.from(nextStreamId(), MessageType.REQUEST_STREAM, payload));
86+
return startStream(Frame.from(nextStreamId(), FrameType.REQUEST_STREAM, payload));
8787
}
8888

8989
/**
@@ -97,7 +97,7 @@ public Publisher<String> requestStream(String payload) {
9797
* @return
9898
*/
9999
public Publisher<Void> fireAndForget(String payload) {
100-
return connection.write(toPublisher(just(Message.from(nextStreamId(), MessageType.FIRE_AND_FORGET, payload))));
100+
return connection.write(toPublisher(just(Frame.from(nextStreamId(), FrameType.FIRE_AND_FORGET, payload))));
101101
}
102102

103103
/**
@@ -108,14 +108,14 @@ public Publisher<Void> fireAndForget(String payload) {
108108
* @return
109109
*/
110110
public Publisher<String> requestSubscription(String payload) {
111-
return startStream(Message.from(nextStreamId(), MessageType.REQUEST_SUBSCRIPTION, payload));
111+
return startStream(Frame.from(nextStreamId(), FrameType.REQUEST_SUBSCRIPTION, payload));
112112
}
113113

114-
private Publisher<String> startStream(Message requestMessage) {
114+
private Publisher<String> startStream(Frame requestFrame) {
115115
return toPublisher(Observable.create(child -> {
116116

117117
// TODO replace this with a UnicastSubject without the overhead of multicast support
118-
PublishSubject<Observable<Message>> writer = PublishSubject.create();
118+
PublishSubject<Observable<Frame>> writer = PublishSubject.create();
119119
Observable<Void> written = toObservable(connection.write(toPublisher(Observable.merge(writer))));
120120

121121
child.setProducer(new Producer() {
@@ -128,43 +128,43 @@ public void request(long n) {
128128
start();
129129
}
130130
// send REQUEST_N over network for streaming request types
131-
if (requestMessage.getMessageType() == MessageType.REQUEST_STREAM || requestMessage.getMessageType() == MessageType.REQUEST_SUBSCRIPTION) {
132-
writer.onNext(just(Message.from(requestMessage.getStreamId(), MessageType.REQUEST_N, String.valueOf(n))));
131+
if (requestFrame.getMessageType() == FrameType.REQUEST_STREAM || requestFrame.getMessageType() == FrameType.REQUEST_SUBSCRIPTION) {
132+
writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.REQUEST_N, String.valueOf(n))));
133133
}
134134
}
135135

136136
private void start() {
137137
// wire up the response handler before emitting request
138-
Observable<Message> input = multicastedInputStream
139-
.filter(m -> m.getStreamId() == requestMessage.getStreamId());
138+
Observable<Frame> input = multicastedInputStream
139+
.filter(m -> m.getStreamId() == requestFrame.getStreamId());
140140

141141
// combine input and output so errors and unsubscription are composed, then subscribe
142142
rx.Subscription subscription = Observable
143-
.merge(input, written.cast(Message.class))
144-
.takeUntil(m -> (m.getMessageType() == MessageType.COMPLETE
145-
|| m.getMessageType() == MessageType.ERROR))
143+
.merge(input, written.cast(Frame.class))
144+
.takeUntil(m -> (m.getMessageType() == FrameType.COMPLETE
145+
|| m.getMessageType() == FrameType.ERROR))
146146
.flatMap(m -> {
147147
// convert ERROR/COMPLETE messages into terminal events
148-
if (m.getMessageType() == MessageType.ERROR) {
148+
if (m.getMessageType() == FrameType.ERROR) {
149149
return error(new Exception(m.getMessage()));
150-
} else if (m.getMessageType() == MessageType.COMPLETE) {
150+
} else if (m.getMessageType() == FrameType.COMPLETE) {
151151
return empty();// unsubscribe handled in takeUntil above
152-
} else if (m.getMessageType() == MessageType.NEXT) {
152+
} else if (m.getMessageType() == FrameType.NEXT) {
153153
return just(m.getMessage());
154154
} else {
155-
return error(new Exception("Unexpected MessageType: " + m.getMessageType()));
155+
return error(new Exception("Unexpected FrameType: " + m.getMessageType()));
156156
}
157157
}).subscribe(Subscribers.from(child));// only propagate Observer methods, backpressure is via Producer above
158158

159159
// if the child unsubscribes, we need to send a CANCEL message if we're not terminated
160160
child.add(Subscriptions.create(() -> {
161-
writer.onNext(just(Message.from(requestMessage.getStreamId(), MessageType.CANCEL, "")));
161+
writer.onNext(just(Frame.from(requestFrame.getStreamId(), FrameType.CANCEL, "")));
162162
// after sending the CANCEL we then tear down this stream
163163
subscription.unsubscribe();
164164
}));
165165

166166
// send the request to start everything
167-
writer.onNext(just(requestMessage));
167+
writer.onNext(just(requestFrame));
168168
}
169169

170170
});

0 commit comments

Comments
 (0)