Skip to content

ServerProtocol with UnitTests #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
repositories {
repositories {
jcenter()
}

Expand All @@ -12,6 +12,9 @@ apply plugin: 'reactivesocket-project'
apply plugin: 'java'

dependencies {
compile 'io.reactivex:rxjava:1.0.13'
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
compile 'org.reactivestreams:reactive-streams:1.0.0.final'
testCompile 'junit:junit-dep:4.10'
testCompile 'org.mockito:mockito-core:1.8.5'
}
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/io/reactivesocket/CancellationToken.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;

import rx.Observable;
import rx.Subscriber;

/**
* Intended to ONLY support a single Subscriber and Publisher for notification of a cancellation event, such as with takeUntil.
*/
/* package */ class CancellationToken extends Observable<Void> {

public static CancellationToken create() {
return new CancellationToken(new State());
}

private State state;

protected CancellationToken(State state) {
super(s -> {
synchronized (state) {
if (state.cancelled) {
s.onCompleted(); // always onComplete when cancelled
} else {
if (state.subscriber != null) {
throw new IllegalStateException("Only 1 Subscriber permitted on a CancellationSubject");
} else {
state.subscriber = s;
}
}
}
});
this.state = state;
}

public final void cancel() {
Subscriber<?> emitTo;
synchronized (state) {
state.cancelled = true;
emitTo = state.subscriber;
}
if (emitTo != null) {
emitTo.onCompleted();
}
}

public static class State {
private Subscriber<?> subscriber;
private boolean cancelled = false;
}
}
26 changes: 26 additions & 0 deletions src/main/java/io/reactivesocket/DuplexConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;

import org.reactivestreams.Publisher;

public interface DuplexConnection {

Publisher<Message> getInput();

Publisher<Void> write(Message o);

}
138 changes: 138 additions & 0 deletions src/main/java/io/reactivesocket/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;

import java.nio.ByteBuffer;

public class Message {

private Message() {
}

// not final so we can reuse this object
private ByteBuffer b;
private int messageId;
private MessageType type;
private String message;

public ByteBuffer getBytes() {
return b;
}

public String getMessage() {
if (type == null) {
decode();
}
return message;
}

public int getMessageId() {
if (type == null) {
decode();
}
return messageId;
}

public MessageType getMessageType() {
if (type == null) {
decode();
}
return type;
}

/**
* Mutates this Frame to contain the given ByteBuffer
*
* @param b
*/
public void wrap(ByteBuffer b) {
this.messageId = -1;
this.type = null;
this.message = null;
this.b = b;
}

/**
* Construct a new Frame from the given ByteBuffer
*
* @param b
* @return
*/
public static Message from(ByteBuffer b) {
Message f = new Message();
f.b = b;
return f;
}

/**
* Mutates this Frame to contain the given message.
*
* @param messageId
* @param type
* @param message
*/
public void wrap(int messageId, MessageType type, String message) {
this.messageId = messageId;
this.type = type;
this.message = message;
this.b = getBytes(messageId, type, message);
}

/**
* Construct a new Frame with the given message.
*
* @param messageId
* @param type
* @param message
* @return
*/
public static Message from(int messageId, MessageType type, String message) {
Message f = new Message();
f.b = getBytes(messageId, type, message);
f.messageId = messageId;
f.type = type;
f.message = message;
return f;
}

private static ByteBuffer getBytes(int messageId, MessageType type, String message) {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
String s = "[" + type.ordinal() + "]" + getIdString(messageId) + message;
// TODO stop allocating ... use flywheels
return ByteBuffer.wrap(s.getBytes());
}

private static String getIdString(int id) {
return "[" + id + "]|";
}

private void decode() {
// TODO replace with binary
/**
* This is NOT how we want it for real. Just representing the idea for discussion.
*/
String data = new String(b.array());
int separator = data.indexOf('|');
String prefix = data.substring(0, separator);
this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))];
this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
this.message = data.substring(separator + 1, data.length());
}

}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivesocket/MessageType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;

enum MessageType {
// DO NOT REORDER OR INSERT NEW ELEMENTS. THE ORDINAL IS PART OF THE PROTOCOL
SUBSCRIBE_REQUEST_RESPONSE, SUBSCRIBE_STREAM, STREAM_REQUEST, DISPOSE, NEXT_COMPLETE, NEXT, ERROR, COMPLETE;
public static MessageType[] values = MessageType.values(); // cached for performance
}
82 changes: 82 additions & 0 deletions src/main/java/io/reactivesocket/ReactiveSocketServerProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket;

import static rx.Observable.*;
import static rx.RxReactiveStreams.*;

import java.util.concurrent.ConcurrentHashMap;

import org.reactivestreams.Publisher;

import rx.functions.Func1;

public class ReactiveSocketServerProtocol {

private Func1<String, Publisher<String>> requestResponseHandler;
private Func1<String, Publisher<String>> requestStreamHandler;

ReactiveSocketServerProtocol(
Func1<String, Publisher<String>> requestResponseHandler,
Func1<String, Publisher<String>> requestStreamHandler) {
this.requestResponseHandler = requestResponseHandler;
this.requestStreamHandler = requestStreamHandler;
}

public Publisher<Void> acceptConnection(DuplexConnection ws) {
/* state of cancellation subjects during connection */
// TODO consider using the LongObjectHashMap from Agrona for perf improvement
// TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();

return toPublisher(toObservable(ws.getInput()).flatMap(message -> {
if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) {
CancellationToken cancellationToken = CancellationToken.create();
cancellationObservables.put(message.getMessageId(), cancellationToken);

return toObservable(requestResponseHandler.call(message.getMessage()))
.single() // enforce that it is a request/response
.map(v -> Message.from(message.getMessageId(), MessageType.NEXT_COMPLETE, v))
.onErrorReturn(err -> Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage()))
.flatMap(payload -> toObservable(ws.write(payload)))
.takeUntil(cancellationToken)
.finallyDo(() -> cancellationObservables.remove(message.getMessageId()));
} else if (message.getMessageType() == MessageType.SUBSCRIBE_STREAM) {
CancellationToken cancellationToken = CancellationToken.create();
cancellationObservables.put(message.getMessageId(), cancellationToken);

//@formatter:off
return toObservable(requestStreamHandler.call(message.getMessage()))
.flatMap(s -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.NEXT, s))),
err -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage()))),
() -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.COMPLETE, "")))
)
.takeUntil(cancellationToken)
.finallyDo(() -> cancellationObservables.remove(message.getMessageId()));
//@formatter:on
} else if (message.getMessageType() == MessageType.DISPOSE) {
CancellationToken cancellationToken = cancellationObservables.get(message.getMessageId());
if (cancellationToken != null) {
cancellationToken.cancel();
}
return empty();
} else {
return error(new IllegalStateException("Unexpected prefix: " + message.getMessageType()));
}
}));
}

}
Loading