Skip to content

Commit 626c947

Browse files
Merge pull request #1 from ReactiveSocket/server-protocol
ServerProtocol with UnitTests
2 parents c3344d1 + 3356657 commit 626c947

File tree

9 files changed

+712
-1
lines changed

9 files changed

+712
-1
lines changed

build.gradle

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
buildscript {
2-
repositories {
2+
repositories {
33
jcenter()
44
}
55

@@ -12,6 +12,9 @@ apply plugin: 'reactivesocket-project'
1212
apply plugin: 'java'
1313

1414
dependencies {
15+
compile 'io.reactivex:rxjava:1.0.13'
16+
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
17+
compile 'org.reactivestreams:reactive-streams:1.0.0.final'
1518
testCompile 'junit:junit-dep:4.10'
1619
testCompile 'org.mockito:mockito-core:1.8.5'
1720
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import rx.Observable;
19+
import rx.Subscriber;
20+
21+
/**
22+
* Intended to ONLY support a single Subscriber and Publisher for notification of a cancellation event, such as with takeUntil.
23+
*/
24+
/* package */ class CancellationToken extends Observable<Void> {
25+
26+
public static CancellationToken create() {
27+
return new CancellationToken(new State());
28+
}
29+
30+
private State state;
31+
32+
protected CancellationToken(State state) {
33+
super(s -> {
34+
synchronized (state) {
35+
if (state.cancelled) {
36+
s.onCompleted(); // always onComplete when cancelled
37+
} else {
38+
if (state.subscriber != null) {
39+
throw new IllegalStateException("Only 1 Subscriber permitted on a CancellationSubject");
40+
} else {
41+
state.subscriber = s;
42+
}
43+
}
44+
}
45+
});
46+
this.state = state;
47+
}
48+
49+
public final void cancel() {
50+
Subscriber<?> emitTo;
51+
synchronized (state) {
52+
state.cancelled = true;
53+
emitTo = state.subscriber;
54+
}
55+
if (emitTo != null) {
56+
emitTo.onCompleted();
57+
}
58+
}
59+
60+
public static class State {
61+
private Subscriber<?> subscriber;
62+
private boolean cancelled = false;
63+
}
64+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import org.reactivestreams.Publisher;
19+
20+
public interface DuplexConnection {
21+
22+
Publisher<Message> getInput();
23+
24+
Publisher<Void> write(Message o);
25+
26+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import java.nio.ByteBuffer;
19+
20+
public class Message {
21+
22+
private Message() {
23+
}
24+
25+
// not final so we can reuse this object
26+
private ByteBuffer b;
27+
private int messageId;
28+
private MessageType type;
29+
private String message;
30+
31+
public ByteBuffer getBytes() {
32+
return b;
33+
}
34+
35+
public String getMessage() {
36+
if (type == null) {
37+
decode();
38+
}
39+
return message;
40+
}
41+
42+
public int getMessageId() {
43+
if (type == null) {
44+
decode();
45+
}
46+
return messageId;
47+
}
48+
49+
public MessageType getMessageType() {
50+
if (type == null) {
51+
decode();
52+
}
53+
return type;
54+
}
55+
56+
/**
57+
* Mutates this Frame to contain the given ByteBuffer
58+
*
59+
* @param b
60+
*/
61+
public void wrap(ByteBuffer b) {
62+
this.messageId = -1;
63+
this.type = null;
64+
this.message = null;
65+
this.b = b;
66+
}
67+
68+
/**
69+
* Construct a new Frame from the given ByteBuffer
70+
*
71+
* @param b
72+
* @return
73+
*/
74+
public static Message from(ByteBuffer b) {
75+
Message f = new Message();
76+
f.b = b;
77+
return f;
78+
}
79+
80+
/**
81+
* Mutates this Frame to contain the given message.
82+
*
83+
* @param messageId
84+
* @param type
85+
* @param message
86+
*/
87+
public void wrap(int messageId, MessageType type, String message) {
88+
this.messageId = messageId;
89+
this.type = type;
90+
this.message = message;
91+
this.b = getBytes(messageId, type, message);
92+
}
93+
94+
/**
95+
* Construct a new Frame with the given message.
96+
*
97+
* @param messageId
98+
* @param type
99+
* @param message
100+
* @return
101+
*/
102+
public static Message from(int messageId, MessageType type, String message) {
103+
Message f = new Message();
104+
f.b = getBytes(messageId, type, message);
105+
f.messageId = messageId;
106+
f.type = type;
107+
f.message = message;
108+
return f;
109+
}
110+
111+
private static ByteBuffer getBytes(int messageId, MessageType type, String message) {
112+
// TODO replace with binary
113+
/**
114+
* This is NOT how we want it for real. Just representing the idea for discussion.
115+
*/
116+
String s = "[" + type.ordinal() + "]" + getIdString(messageId) + message;
117+
// TODO stop allocating ... use flywheels
118+
return ByteBuffer.wrap(s.getBytes());
119+
}
120+
121+
private static String getIdString(int id) {
122+
return "[" + id + "]|";
123+
}
124+
125+
private void decode() {
126+
// TODO replace with binary
127+
/**
128+
* This is NOT how we want it for real. Just representing the idea for discussion.
129+
*/
130+
String data = new String(b.array());
131+
int separator = data.indexOf('|');
132+
String prefix = data.substring(0, separator);
133+
this.type = MessageType.values[Integer.parseInt(prefix.substring(1, data.indexOf(']')))];
134+
this.messageId = Integer.parseInt(prefix.substring(prefix.lastIndexOf("[") + 1, prefix.length() - 1));
135+
this.message = data.substring(separator + 1, data.length());
136+
}
137+
138+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
enum MessageType {
19+
// DO NOT REORDER OR INSERT NEW ELEMENTS. THE ORDINAL IS PART OF THE PROTOCOL
20+
SUBSCRIBE_REQUEST_RESPONSE, SUBSCRIBE_STREAM, STREAM_REQUEST, DISPOSE, NEXT_COMPLETE, NEXT, ERROR, COMPLETE;
21+
public static MessageType[] values = MessageType.values(); // cached for performance
22+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import static rx.Observable.*;
19+
import static rx.RxReactiveStreams.*;
20+
21+
import java.util.concurrent.ConcurrentHashMap;
22+
23+
import org.reactivestreams.Publisher;
24+
25+
import rx.functions.Func1;
26+
27+
public class ReactiveSocketServerProtocol {
28+
29+
private Func1<String, Publisher<String>> requestResponseHandler;
30+
private Func1<String, Publisher<String>> requestStreamHandler;
31+
32+
ReactiveSocketServerProtocol(
33+
Func1<String, Publisher<String>> requestResponseHandler,
34+
Func1<String, Publisher<String>> requestStreamHandler) {
35+
this.requestResponseHandler = requestResponseHandler;
36+
this.requestStreamHandler = requestStreamHandler;
37+
}
38+
39+
public Publisher<Void> acceptConnection(DuplexConnection ws) {
40+
/* state of cancellation subjects during connection */
41+
// TODO consider using the LongObjectHashMap from Agrona for perf improvement
42+
// TODO consider alternate to PublishSubject that assumes a single subscriber and is lighter
43+
final ConcurrentHashMap<Integer, CancellationToken> cancellationObservables = new ConcurrentHashMap<>();
44+
45+
return toPublisher(toObservable(ws.getInput()).flatMap(message -> {
46+
if (message.getMessageType() == MessageType.SUBSCRIBE_REQUEST_RESPONSE) {
47+
CancellationToken cancellationToken = CancellationToken.create();
48+
cancellationObservables.put(message.getMessageId(), cancellationToken);
49+
50+
return toObservable(requestResponseHandler.call(message.getMessage()))
51+
.single() // enforce that it is a request/response
52+
.map(v -> Message.from(message.getMessageId(), MessageType.NEXT_COMPLETE, v))
53+
.onErrorReturn(err -> Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage()))
54+
.flatMap(payload -> toObservable(ws.write(payload)))
55+
.takeUntil(cancellationToken)
56+
.finallyDo(() -> cancellationObservables.remove(message.getMessageId()));
57+
} else if (message.getMessageType() == MessageType.SUBSCRIBE_STREAM) {
58+
CancellationToken cancellationToken = CancellationToken.create();
59+
cancellationObservables.put(message.getMessageId(), cancellationToken);
60+
61+
//@formatter:off
62+
return toObservable(requestStreamHandler.call(message.getMessage()))
63+
.flatMap(s -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.NEXT, s))),
64+
err -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.ERROR, err.getMessage()))),
65+
() -> toObservable(ws.write(Message.from(message.getMessageId(), MessageType.COMPLETE, "")))
66+
)
67+
.takeUntil(cancellationToken)
68+
.finallyDo(() -> cancellationObservables.remove(message.getMessageId()));
69+
//@formatter:on
70+
} else if (message.getMessageType() == MessageType.DISPOSE) {
71+
CancellationToken cancellationToken = cancellationObservables.get(message.getMessageId());
72+
if (cancellationToken != null) {
73+
cancellationToken.cancel();
74+
}
75+
return empty();
76+
} else {
77+
return error(new IllegalStateException("Unexpected prefix: " + message.getMessageType()));
78+
}
79+
}));
80+
}
81+
82+
}

0 commit comments

Comments
 (0)