Skip to content

Commit 79aeaba

Browse files
Merge pull request #2 from ReactiveSocket/api-changes
API Changes and Fixes
2 parents d1c11aa + 36124e6 commit 79aeaba

File tree

3 files changed

+30
-9
lines changed

3 files changed

+30
-9
lines changed

src/main/java/io/reactivesocket/Message.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,26 @@ private void decode() {
127127
/**
128128
* This is NOT how we want it for real. Just representing the idea for discussion.
129129
*/
130-
String data = new String(b.array());
130+
byte[] copy = new byte[b.limit()];
131+
b.get(copy);
132+
String data = new String(copy);
133+
System.out.println("RAW: " + data);
131134
int separator = data.indexOf('|');
132135
String prefix = data.substring(0, separator);
133136
this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))];
134137
this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
135138
this.message = data.substring(separator + 1, data.length());
136139
}
137140

141+
@Override
142+
public String toString() {
143+
if (type == null) {
144+
try {
145+
decode();
146+
} catch (Exception e) {
147+
e.printStackTrace();
148+
}
149+
}
150+
return "Message => ID: " + messageId + " Type: " + type + " Data: " + message;
151+
}
138152
}

src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,27 @@ public class ReactiveSocketServerProtocol {
2929
private Func1<String, Publisher<String>> requestResponseHandler;
3030
private Func1<String, Publisher<String>> requestStreamHandler;
3131

32-
ReactiveSocketServerProtocol(
32+
private ReactiveSocketServerProtocol(
3333
Func1<String, Publisher<String>> requestResponseHandler,
3434
Func1<String, Publisher<String>> requestStreamHandler) {
3535
this.requestResponseHandler = requestResponseHandler;
3636
this.requestStreamHandler = requestStreamHandler;
3737
}
3838

39+
public static ReactiveSocketServerProtocol create(
40+
Func1<String, Publisher<String>> requestResponseHandler,
41+
Func1<String, Publisher<String>> requestStreamHandler) {
42+
return new ReactiveSocketServerProtocol(requestResponseHandler, requestStreamHandler);
43+
}
44+
3945
public Publisher<Void> acceptConnection(DuplexConnection ws) {
4046
/* state of cancellation subjects during connection */
4147
// TODO consider using the LongObjectHashMap from Agrona for perf improvement
4248
// TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter
4349
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();
4450

4551
return toPublisher(toObservable(ws.getInput()).flatMap(message -> {
52+
System.out.println("message: " + message);
4653
if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) {
4754
CancellationToken cancellationToken = CancellationToken.create();
4855
cancellationObservables.put(message.getMessageId(), cancellationToken);

src/test/java/io/reactivesocket/ReactiveSocketServerProtocolTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class ReactiveSocketServerProtocolTest {
3535

3636
@Test
3737
public void testRequestResponseSuccess() {
38-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
38+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
3939
request -> toPublisher(just(request + " world")),
4040
null);
4141

@@ -54,7 +54,7 @@ public void testRequestResponseSuccess() {
5454

5555
@Test
5656
public void testRequestResponseError() {
57-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
57+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
5858
request -> toPublisher(error(new Exception("Request Not Found"))),
5959
null);
6060

@@ -78,7 +78,7 @@ public void testRequestResponseCancel() {
7878
.cast(String.class)
7979
.doOnUnsubscribe(() -> unsubscribed.set(true));
8080

81-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
81+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
8282
request -> toPublisher(delayed),
8383
null);
8484

@@ -97,7 +97,7 @@ public void testRequestResponseCancel() {
9797

9898
@Test
9999
public void testRequestStreamSuccess() {
100-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
100+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
101101
null,
102102
request -> toPublisher(range(Integer.parseInt(request), 10).map(i -> i + "!")));
103103

@@ -126,7 +126,7 @@ public void testRequestStreamSuccess() {
126126

127127
@Test
128128
public void testRequestStreamError() {
129-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
129+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
130130
null,
131131
request -> toPublisher(range(Integer.parseInt(request), 3)
132132
.map(i -> i + "!")
@@ -158,7 +158,7 @@ public void testRequestStreamError() {
158158
@Test
159159
public void testRequestStreamCancel() {
160160
TestScheduler ts = Schedulers.test();
161-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
161+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
162162
null,
163163
request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "!")));
164164

@@ -196,7 +196,7 @@ public void testRequestStreamCancel() {
196196
@Test
197197
public void testMultiplexedStreams() {
198198
TestScheduler ts = Schedulers.test();
199-
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
199+
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
200200
null,
201201
request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "_" + request)));
202202

0 commit comments

Comments
 (0)