Skip to content

Commit b83de2e

Browse files
committed
added impl that lets you run reactivesocket client/server in the same JVM
1 parent cf65dd6 commit b83de2e

File tree

9 files changed

+623
-1
lines changed

9 files changed

+623
-1
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package io.reactivesocket.local;
2+
3+
import io.reactivesocket.DuplexConnection;
4+
import io.reactivesocket.Frame;
5+
import io.reactivesocket.rx.Completable;
6+
import io.reactivesocket.rx.Observable;
7+
import io.reactivesocket.rx.Observer;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.Subscriber;
10+
import org.reactivestreams.Subscription;
11+
12+
import java.io.IOException;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
15+
class LocalClientDuplexConnection implements DuplexConnection {
16+
private final String name;
17+
18+
private final CopyOnWriteArrayList<Observer<Frame>> subjects;
19+
20+
public LocalClientDuplexConnection(String name) {
21+
this.name = name;
22+
this.subjects = new CopyOnWriteArrayList<>();
23+
}
24+
25+
@Override
26+
public Observable<Frame> getInput() {
27+
return o -> {
28+
o.onSubscribe(() -> subjects.removeIf(s -> s == o));
29+
subjects.add(o);
30+
};
31+
}
32+
33+
@Override
34+
public void addOutput(Publisher<Frame> o, Completable callback) {
35+
36+
o
37+
.subscribe(new Subscriber<Frame>() {
38+
39+
@Override
40+
public void onSubscribe(Subscription s) {
41+
s.request(Long.MAX_VALUE);
42+
}
43+
44+
@Override
45+
public void onNext(Frame frame) {
46+
try {
47+
LocalReactiveSocketManager
48+
.getInstance()
49+
.getServerConnection(name)
50+
.write(frame);
51+
} catch (Throwable t) {
52+
onError(t);
53+
}
54+
}
55+
56+
@Override
57+
public void onError(Throwable t) {
58+
callback.error(t);
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
callback.success();
64+
}
65+
});
66+
}
67+
68+
void write(Frame frame) {
69+
subjects
70+
.forEach(o -> o.onNext(frame));
71+
}
72+
73+
@Override
74+
public void close() throws IOException {
75+
LocalReactiveSocketManager
76+
.getInstance()
77+
.removeClientConnection(name);
78+
79+
}
80+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.reactivesocket.local;
2+
3+
import io.reactivesocket.ConnectionSetupPayload;
4+
import io.reactivesocket.DefaultReactiveSocket;
5+
import io.reactivesocket.ReactiveSocket;
6+
import io.reactivesocket.ReactiveSocketFactory;
7+
import io.reactivesocket.internal.rx.EmptySubscription;
8+
import org.reactivestreams.Publisher;
9+
10+
public class LocalClientReactiveSocketFactory implements ReactiveSocketFactory<LocalClientReactiveSocketFactory.Config, ReactiveSocket> {
11+
public static final LocalClientReactiveSocketFactory INSTANCE = new LocalClientReactiveSocketFactory();
12+
13+
private LocalClientReactiveSocketFactory() {}
14+
15+
@Override
16+
public Publisher<ReactiveSocket> call(Config config) {
17+
return s -> {
18+
try {
19+
s.onSubscribe(EmptySubscription.INSTANCE);
20+
LocalClientDuplexConnection clientConnection = LocalReactiveSocketManager
21+
.getInstance()
22+
.getClientConnection(config.getName());
23+
ReactiveSocket reactiveSocket = DefaultReactiveSocket
24+
.fromClientConnection(clientConnection, ConnectionSetupPayload.create(config.getMetadataMimeType(), config.getDataMimeType()));
25+
26+
reactiveSocket.startAndWait();
27+
28+
s.onNext(reactiveSocket);
29+
s.onComplete();
30+
} catch (Throwable t) {
31+
s.onError(t);
32+
}
33+
};
34+
}
35+
36+
public static class Config {
37+
final String name;
38+
final String metadataMimeType;
39+
final String dataMimeType;
40+
41+
public Config(String name, String metadataMimeType, String dataMimeType) {
42+
this.name = name;
43+
this.metadataMimeType = metadataMimeType;
44+
this.dataMimeType = dataMimeType;
45+
}
46+
47+
public String getName() {
48+
return name;
49+
}
50+
51+
public String getMetadataMimeType() {
52+
return metadataMimeType;
53+
}
54+
55+
public String getDataMimeType() {
56+
return dataMimeType;
57+
}
58+
}
59+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.reactivesocket.local;
2+
3+
import java.util.concurrent.ConcurrentHashMap;
4+
5+
/**
6+
* Created by rroeser on 4/2/16.
7+
*/
8+
class LocalReactiveSocketManager {
9+
private static final LocalReactiveSocketManager INSTANCE = new LocalReactiveSocketManager();
10+
11+
private final ConcurrentHashMap<String, LocalServerDuplexConection> serverConnections;
12+
private final ConcurrentHashMap<String, LocalClientDuplexConnection> clientConnections;
13+
14+
private LocalReactiveSocketManager() {
15+
serverConnections = new ConcurrentHashMap<>();
16+
clientConnections = new ConcurrentHashMap<>();
17+
}
18+
19+
public static LocalReactiveSocketManager getInstance() {
20+
return INSTANCE;
21+
}
22+
23+
public LocalClientDuplexConnection getClientConnection(String name) {
24+
return clientConnections.computeIfAbsent(name, LocalClientDuplexConnection::new);
25+
}
26+
27+
public void removeClientConnection(String name) {
28+
clientConnections.remove(name);
29+
}
30+
31+
public LocalServerDuplexConection getServerConnection(String name) {
32+
return serverConnections.computeIfAbsent(name, LocalServerDuplexConection::new);
33+
}
34+
35+
public void removeServerDuplexConnection(String name) {
36+
serverConnections.remove(name);
37+
}
38+
39+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.reactivesocket.local;
2+
3+
import io.reactivesocket.DuplexConnection;
4+
import io.reactivesocket.Frame;
5+
import io.reactivesocket.rx.Completable;
6+
import io.reactivesocket.rx.Observable;
7+
import io.reactivesocket.rx.Observer;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.Subscriber;
10+
import org.reactivestreams.Subscription;
11+
12+
import java.io.IOException;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
15+
class LocalServerDuplexConection implements DuplexConnection {
16+
private final String name;
17+
18+
private final CopyOnWriteArrayList<Observer<Frame>> subjects;
19+
20+
public LocalServerDuplexConection(String name) {
21+
this.name = name;
22+
this.subjects = new CopyOnWriteArrayList<>();
23+
}
24+
25+
@Override
26+
public Observable<Frame> getInput() {
27+
return o -> {
28+
o.onSubscribe(() -> subjects.removeIf(s -> s == o));
29+
subjects.add(o);
30+
};
31+
}
32+
33+
@Override
34+
public void addOutput(Publisher<Frame> o, Completable callback) {
35+
o
36+
.subscribe(new Subscriber<Frame>() {
37+
38+
@Override
39+
public void onSubscribe(Subscription s) {
40+
s.request(Long.MAX_VALUE);
41+
}
42+
43+
@Override
44+
public void onNext(Frame frame) {
45+
try {
46+
LocalReactiveSocketManager
47+
.getInstance()
48+
.getClientConnection(name)
49+
.write(frame);
50+
} catch (Throwable t) {
51+
onError(t);
52+
}
53+
}
54+
55+
@Override
56+
public void onError(Throwable t) {
57+
callback.error(t);
58+
}
59+
60+
@Override
61+
public void onComplete() {
62+
callback.success();
63+
}
64+
});
65+
}
66+
67+
void write(Frame frame) {
68+
subjects
69+
.forEach(o -> o.onNext(frame));
70+
}
71+
72+
@Override
73+
public void close() throws IOException {
74+
LocalReactiveSocketManager
75+
.getInstance()
76+
.removeServerDuplexConnection(name);
77+
78+
}
79+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.reactivesocket.local;
2+
3+
import io.reactivesocket.ConnectionSetupHandler;
4+
import io.reactivesocket.DefaultReactiveSocket;
5+
import io.reactivesocket.ReactiveSocket;
6+
import io.reactivesocket.ReactiveSocketFactory;
7+
import io.reactivesocket.internal.rx.EmptySubscription;
8+
import org.reactivestreams.Publisher;
9+
10+
public class LocalServerReactiveSocketFactory implements ReactiveSocketFactory<LocalServerReactiveSocketFactory.Config, ReactiveSocket> {
11+
public static final LocalServerReactiveSocketFactory INSTANCE = new LocalServerReactiveSocketFactory();
12+
13+
private LocalServerReactiveSocketFactory() {}
14+
15+
@Override
16+
public Publisher<ReactiveSocket> call(Config config) {
17+
return s -> {
18+
try {
19+
s.onSubscribe(EmptySubscription.INSTANCE);
20+
LocalServerDuplexConection clientConnection = LocalReactiveSocketManager
21+
.getInstance()
22+
.getServerConnection(config.getName());
23+
ReactiveSocket reactiveSocket = DefaultReactiveSocket
24+
.fromServerConnection(clientConnection, config.getConnectionSetupHandler());
25+
26+
reactiveSocket.startAndWait();
27+
s.onNext(reactiveSocket);
28+
s.onComplete();
29+
} catch (Throwable t) {
30+
s.onError(t);
31+
}
32+
};
33+
}
34+
35+
public static class Config {
36+
final String name;
37+
final ConnectionSetupHandler connectionSetupHandler;
38+
39+
public Config(String name, ConnectionSetupHandler connectionSetupHandler) {
40+
this.name = name;
41+
this.connectionSetupHandler = connectionSetupHandler;
42+
}
43+
44+
public ConnectionSetupHandler getConnectionSetupHandler() {
45+
return connectionSetupHandler;
46+
}
47+
48+
public String getName() {
49+
return name;
50+
}
51+
}
52+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* An implementation of ReactiveSocket that lets you run the client and server in the same local JVM. To create
3+
* a client use {@link io.reactivesocket.local.LocalClientReactiveSocketFactory} and to create a server use
4+
* {@link io.reactivesocket.local.LocalServerReactiveSocketFactory} factories classes.
5+
*/
6+
package io.reactivesocket.local;

0 commit comments

Comments
 (0)