Skip to content

API Changes and Fixes #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/main/java/io/reactivesocket/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,26 @@ private void decode() {
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
String data = new String(b.array());
byte[] copy = new byte[b.limit()];
b.get(copy);
String data = new String(copy);
System.out.println("RAW: " + data);
int separator = data.indexOf('|');
String prefix = data.substring(0, separator);
this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))];
this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
this.message = data.substring(separator + 1, data.length());
}

@Override
public String toString() {
if (type == null) {
try {
decode();
} catch (Exception e) {
e.printStackTrace();
}
}
return "Message => ID: " + messageId + " Type: " + type + " Data: " + message;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,27 @@ public class ReactiveSocketServerProtocol {
private Func1<String, Publisher<String>> requestResponseHandler;
private Func1<String, Publisher<String>> requestStreamHandler;

ReactiveSocketServerProtocol(
private ReactiveSocketServerProtocol(
Func1<String, Publisher<String>> requestResponseHandler,
Func1<String, Publisher<String>> requestStreamHandler) {
this.requestResponseHandler = requestResponseHandler;
this.requestStreamHandler = requestStreamHandler;
}

public static ReactiveSocketServerProtocol create(
Func1<String, Publisher<String>> requestResponseHandler,
Func1<String, Publisher<String>> requestStreamHandler) {
return new ReactiveSocketServerProtocol(requestResponseHandler, requestStreamHandler);
}

public Publisher<Void> acceptConnection(DuplexConnection ws) {
/* state of cancellation subjects during connection */
// TODO consider using the LongObjectHashMap from Agrona for perf improvement
// TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();

return toPublisher(toObservable(ws.getInput()).flatMap(message -> {
System.out.println("message: " + message);
if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) {
CancellationToken cancellationToken = CancellationToken.create();
cancellationObservables.put(message.getMessageId(), cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class ReactiveSocketServerProtocolTest {

@Test
public void testRequestResponseSuccess() {
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
request -> toPublisher(just(request + " world")),
null);

Expand All @@ -54,7 +54,7 @@ public void testRequestResponseSuccess() {

@Test
public void testRequestResponseError() {
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
request -> toPublisher(error(new Exception("Request Not Found"))),
null);

Expand All @@ -78,7 +78,7 @@ public void testRequestResponseCancel() {
.cast(String.class)
.doOnUnsubscribe(() -> unsubscribed.set(true));

ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
request -> toPublisher(delayed),
null);

Expand All @@ -97,7 +97,7 @@ public void testRequestResponseCancel() {

@Test
public void testRequestStreamSuccess() {
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
null,
request -> toPublisher(range(Integer.parseInt(request), 10).map(i -> i + "!")));

Expand Down Expand Up @@ -126,7 +126,7 @@ public void testRequestStreamSuccess() {

@Test
public void testRequestStreamError() {
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
null,
request -> toPublisher(range(Integer.parseInt(request), 3)
.map(i -> i + "!")
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testRequestStreamError() {
@Test
public void testRequestStreamCancel() {
TestScheduler ts = Schedulers.test();
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
null,
request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "!")));

Expand Down Expand Up @@ -196,7 +196,7 @@ public void testRequestStreamCancel() {
@Test
public void testMultiplexedStreams() {
TestScheduler ts = Schedulers.test();
ReactiveSocketServerProtocol p = new ReactiveSocketServerProtocol(
ReactiveSocketServerProtocol p = ReactiveSocketServerProtocol.create(
null,
request -> toPublisher(interval(1000, TimeUnit.MILLISECONDS, ts).map(i -> i + "_" + request)));

Expand Down