Skip to content

Commit 0804807

Browse files
committed
Added ReactiveSocketFactory implementations for netty and jsr-356
1 parent db575e4 commit 0804807

File tree

12 files changed

+322
-150
lines changed

12 files changed

+322
-150
lines changed

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/AeronReactiveSocketFactory.java

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@
33
import io.reactivesocket.ConnectionSetupPayload;
44
import io.reactivesocket.ReactiveSocket;
55
import io.reactivesocket.ReactiveSocketFactory;
6-
import io.reactivesocket.internal.rx.EmptySubscription;
76
import io.reactivesocket.rx.Completable;
87
import org.reactivestreams.Publisher;
98
import org.reactivestreams.Subscriber;
109
import org.reactivestreams.Subscription;
1110
import org.slf4j.Logger;
1211
import org.slf4j.LoggerFactory;
12+
import rx.Observable;
13+
import rx.RxReactiveStreams;
1314
import uk.co.real_logic.agrona.LangUtil;
1415

1516
import java.net.*;
1617
import java.util.Enumeration;
17-
import java.util.concurrent.CountDownLatch;
1818
import java.util.concurrent.TimeUnit;
1919
import java.util.function.Consumer;
2020

@@ -47,55 +47,45 @@ public AeronReactiveSocketFactory(String host, int port, ConnectionSetupPayload
4747

4848
@Override
4949
public Publisher<ReactiveSocket> call(SocketAddress address, long timeout, TimeUnit timeUnit) {
50-
Publisher<AeronClientDuplexConnection> aeronClientDuplexConnection
50+
Publisher<AeronClientDuplexConnection> connection
5151
= AeronClientDuplexConnectionFactory.getInstance().createAeronClientDuplexConnection(address);
5252

53-
return (Subscriber<? super ReactiveSocket> s) -> {
54-
s.onSubscribe(EmptySubscription.INSTANCE);
55-
aeronClientDuplexConnection
56-
.subscribe(new Subscriber<AeronClientDuplexConnection>() {
57-
58-
@Override
59-
public void onSubscribe(Subscription s) {
60-
s.request(1);
61-
}
62-
63-
@Override
64-
public void onNext(AeronClientDuplexConnection connection) {
65-
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
66-
CountDownLatch latch = new CountDownLatch(1);
67-
reactiveSocket.start(new Completable() {
68-
@Override
69-
public void success() {
70-
latch.countDown();
71-
s.onNext(reactiveSocket);
72-
s.onComplete();
73-
}
74-
75-
@Override
76-
public void error(Throwable e) {
77-
s.onError(e);
78-
}
79-
});
80-
81-
try {
82-
latch.await(timeout, timeUnit);
83-
} catch (InterruptedException e) {
84-
logger.error(e.getMessage(), e);
53+
Observable<ReactiveSocket> result = Observable.create(s ->
54+
connection.subscribe(new Subscriber<AeronClientDuplexConnection>() {
55+
@Override
56+
public void onSubscribe(Subscription s) {
57+
s.request(1);
58+
}
59+
60+
@Override
61+
public void onNext(AeronClientDuplexConnection connection) {
62+
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
63+
reactiveSocket.start(new Completable() {
64+
@Override
65+
public void success() {
66+
s.onNext(reactiveSocket);
67+
s.onCompleted();
68+
}
69+
70+
@Override
71+
public void error(Throwable e) {
8572
s.onError(e);
8673
}
87-
}
88-
89-
@Override
90-
public void onError(Throwable t) {
91-
s.onError(t);
92-
}
93-
94-
@Override
95-
public void onComplete() {
96-
}
97-
});
98-
};
74+
});
75+
}
76+
77+
@Override
78+
public void onError(Throwable t) {
79+
s.onError(t);
80+
}
81+
82+
@Override
83+
public void onComplete() {
84+
}
85+
})
86+
);
87+
88+
return RxReactiveStreams.toPublisher(result.timeout(timeout, timeUnit));
9989
}
10090

10191
private static InetAddress getIPv4InetAddress() {

reactivesocket-jsr-356/build.gradle

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
1-
buildscript {
2-
repositories { maven { url 'http://repo.spring.io/plugins-release' } }
3-
dependencies { classpath 'org.springframework.build.gradle:propdeps-plugin:0.0.7' }
4-
}
5-
6-
apply plugin: 'propdeps'
7-
81
dependencies {
9-
provided 'javax.websocket:javax.websocket-api:1.1'
10-
testCompile 'org.glassfish.tyrus:tyrus-client:1.12'
2+
compile 'org.glassfish.tyrus:tyrus-client:1.12'
113
testCompile 'org.glassfish.tyrus:tyrus-server:1.12'
124
testCompile 'org.glassfish.tyrus:tyrus-container-grizzly-server:1.12'
135
}

reactivesocket-jsr-356/src/main/java/io/reactivesocket/javax/websocket/WebSocketDuplexConnection.java

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,76 +19,64 @@
1919
import io.reactivesocket.Frame;
2020
import io.reactivesocket.rx.Completable;
2121
import io.reactivesocket.rx.Observable;
22-
import io.reactivesocket.rx.Observer;
2322
import org.reactivestreams.Publisher;
2423
import rx.RxReactiveStreams;
25-
import rx.Subscriber;
24+
import rx.Subscription;
25+
import rx.subscriptions.BooleanSubscription;
2626

27-
import javax.websocket.*;
27+
import javax.websocket.Session;
2828
import java.io.IOException;
29-
import java.util.concurrent.CopyOnWriteArrayList;
3029

3130
public class WebSocketDuplexConnection implements DuplexConnection {
32-
private Session session;
31+
private final Session session;
32+
private final rx.Observable<Frame> input;
3333

34-
private CopyOnWriteArrayList<Observer<Frame>> observers;
35-
36-
private WebSocketDuplexConnection(Session session, rx.Observable<Frame> input) {
34+
public WebSocketDuplexConnection(Session session, rx.Observable<Frame> input) {
3735
this.session = session;
38-
this.observers = new CopyOnWriteArrayList<>();
39-
input.subscribe(new Subscriber<Frame>() {
40-
@Override
41-
public void onNext(Frame frame) {
42-
observers.forEach(o -> o.onNext(frame));
43-
}
44-
45-
@Override
46-
public void onError(Throwable e) {
47-
observers.forEach(o -> o.onError(e));
48-
}
49-
50-
@Override
51-
public void onCompleted() {
52-
observers.forEach(Observer::onComplete);
53-
}
54-
});
55-
}
56-
57-
public static WebSocketDuplexConnection create(Session session, rx.Observable<Frame> input) {
58-
return new WebSocketDuplexConnection(session, input);
36+
this.input = input;
5937
}
6038

6139
@Override
6240
public Observable<Frame> getInput() {
63-
return new Observable<Frame>() {
64-
@Override
65-
public void subscribe(Observer<Frame> o) {
66-
observers.add(o);
67-
68-
o.onSubscribe(() ->
69-
observers.removeIf(s -> s == o)
70-
);
71-
}
41+
return o -> {
42+
Subscription subscription = input.subscribe(o::onNext, o::onError, o::onComplete);
43+
o.onSubscribe(subscription::unsubscribe);
7244
};
7345
}
7446

7547
@Override
7648
public void addOutput(Publisher<Frame> o, Completable callback) {
77-
rx.Observable<Void> sent = RxReactiveStreams.toObservable(o).concatMap(frame ->
78-
rx.Observable.create(subscriber -> {
79-
session.getAsyncRemote().sendBinary(frame.getByteBuffer(), result -> {
80-
if (result.isOK()) {
81-
subscriber.onCompleted();
82-
} else {
83-
subscriber.onError(result.getException());
84-
}
85-
});
86-
})
87-
);
49+
rx.Completable sent = rx.Completable.concat(RxReactiveStreams.toObservable(o).map(frame ->
50+
rx.Completable.create(s -> {
51+
BooleanSubscription bs = new BooleanSubscription();
52+
s.onSubscribe(bs);
53+
session.getAsyncRemote().sendBinary(frame.getByteBuffer(), result -> {
54+
if (!bs.isUnsubscribed()) {
55+
if (result.isOK()) {
56+
s.onCompleted();
57+
} else {
58+
s.onError(result.getException());
59+
}
60+
}
61+
});
62+
})
63+
));
64+
65+
sent.subscribe(new rx.Completable.CompletableSubscriber() {
66+
@Override
67+
public void onCompleted() {
68+
callback.success();
69+
}
8870

89-
sent.doOnCompleted(callback::success)
90-
.doOnError(callback::error)
91-
.subscribe();
71+
@Override
72+
public void onError(Throwable e) {
73+
callback.error(e);
74+
}
75+
76+
@Override
77+
public void onSubscribe(Subscription s) {
78+
}
79+
});
9280
}
9381

9482
@Override

reactivesocket-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/ReactiveSocketWebSocketClient.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,68 @@
1616
package io.reactivesocket.javax.websocket.client;
1717

1818
import io.reactivesocket.Frame;
19+
import io.reactivesocket.javax.websocket.WebSocketDuplexConnection;
20+
import org.glassfish.tyrus.client.ClientManager;
21+
import org.glassfish.tyrus.client.ClientProperties;
22+
import org.reactivestreams.Publisher;
23+
import org.reactivestreams.Subscriber;
1924
import rx.Observable;
2025
import rx.subjects.PublishSubject;
2126

2227
import javax.websocket.*;
28+
import java.net.InetSocketAddress;
29+
import java.net.SocketAddress;
30+
import java.net.URI;
31+
import java.net.URISyntaxException;
2332
import java.nio.ByteBuffer;
2433

2534
public class ReactiveSocketWebSocketClient extends Endpoint {
26-
private final PublishSubject<Frame> input;
35+
private final PublishSubject<Frame> input = PublishSubject.create();
36+
private final Subscriber<? super WebSocketDuplexConnection> subscriber;
2737

28-
public ReactiveSocketWebSocketClient() {
29-
this.input = PublishSubject.create();
38+
public ReactiveSocketWebSocketClient(Subscriber<? super WebSocketDuplexConnection> subscriber) {
39+
this.subscriber = subscriber;
3040
}
3141

3242
public Observable<Frame> getInput() {
3343
return input;
3444
}
3545

46+
public static Publisher<WebSocketDuplexConnection> create(SocketAddress socketAddress, String path, ClientManager clientManager) {
47+
if (socketAddress instanceof InetSocketAddress) {
48+
InetSocketAddress address = (InetSocketAddress)socketAddress;
49+
try {
50+
return create(new URI("ws", null, address.getHostName(), address.getPort(), path, null, null), clientManager);
51+
} catch (URISyntaxException e) {
52+
throw new IllegalArgumentException(e.getMessage(), e);
53+
}
54+
} else {
55+
throw new IllegalArgumentException("unknown socket address type => " + socketAddress.getClass());
56+
}
57+
}
58+
59+
public static Publisher<WebSocketDuplexConnection> create(URI uri, ClientManager clientManager) {
60+
return s -> {
61+
try {
62+
clientManager.getProperties().put(ClientProperties.RECONNECT_HANDLER, new ClientManager.ReconnectHandler() {
63+
@Override
64+
public boolean onConnectFailure(Exception exception) {
65+
s.onError(exception);
66+
return false;
67+
}
68+
});
69+
ReactiveSocketWebSocketClient endpoint = new ReactiveSocketWebSocketClient(s);
70+
clientManager.asyncConnectToServer(endpoint, null, uri);
71+
} catch (DeploymentException e) {
72+
s.onError(e);
73+
}
74+
};
75+
}
76+
3677
@Override
3778
public void onOpen(Session session, EndpointConfig config) {
79+
subscriber.onNext(new WebSocketDuplexConnection(session, input));
80+
subscriber.onComplete();
3881
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
3982
@Override
4083
public void onMessage(ByteBuffer message) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.reactivesocket.javax.websocket.client;
2+
3+
import io.reactivesocket.ConnectionSetupPayload;
4+
import io.reactivesocket.ReactiveSocket;
5+
import io.reactivesocket.ReactiveSocketFactory;
6+
import io.reactivesocket.javax.websocket.WebSocketDuplexConnection;
7+
import io.reactivesocket.rx.Completable;
8+
import org.glassfish.tyrus.client.ClientManager;
9+
import org.reactivestreams.Publisher;
10+
import org.reactivestreams.Subscriber;
11+
import org.reactivestreams.Subscription;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import rx.Observable;
15+
import rx.RxReactiveStreams;
16+
17+
import java.net.SocketAddress;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.function.Consumer;
20+
21+
/**
22+
* An implementation of {@link ReactiveSocketFactory} that creates JSR-356 WebSocket ReactiveSockets.
23+
*/
24+
public class WebSocketReactiveSocketFactory implements ReactiveSocketFactory {
25+
private static final Logger logger = LoggerFactory.getLogger(WebSocketReactiveSocketFactory.class);
26+
27+
private final ConnectionSetupPayload connectionSetupPayload;
28+
private final Consumer<Throwable> errorStream;
29+
private final String path;
30+
private final ClientManager clientManager;
31+
32+
public WebSocketReactiveSocketFactory(String path, ClientManager clientManager, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
33+
this.connectionSetupPayload = connectionSetupPayload;
34+
this.errorStream = errorStream;
35+
this.path = path;
36+
this.clientManager = clientManager;
37+
}
38+
39+
@Override
40+
public Publisher<ReactiveSocket> call(SocketAddress address, long timeout, TimeUnit timeUnit) {
41+
Publisher<WebSocketDuplexConnection> connection
42+
= ReactiveSocketWebSocketClient.create(address, path, clientManager);
43+
44+
Observable<ReactiveSocket> result = Observable.create(s ->
45+
connection.subscribe(new Subscriber<WebSocketDuplexConnection>() {
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
s.request(1);
49+
}
50+
51+
@Override
52+
public void onNext(WebSocketDuplexConnection connection) {
53+
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
54+
reactiveSocket.start(new Completable() {
55+
@Override
56+
public void success() {
57+
s.onNext(reactiveSocket);
58+
s.onCompleted();
59+
}
60+
61+
@Override
62+
public void error(Throwable e) {
63+
s.onError(e);
64+
}
65+
});
66+
}
67+
68+
@Override
69+
public void onError(Throwable t) {
70+
s.onError(t);
71+
}
72+
73+
@Override
74+
public void onComplete() {
75+
}
76+
})
77+
);
78+
79+
return RxReactiveStreams.toPublisher(result.timeout(timeout, timeUnit));
80+
}
81+
}

0 commit comments

Comments
 (0)