Skip to content

Commit db575e4

Browse files
committed
Added AeronReactiveSocketFactory
1 parent a9aa871 commit db575e4

File tree

2 files changed

+127
-2
lines changed

2 files changed

+127
-2
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ subprojects {
1414
}
1515

1616
dependencies {
17-
compile 'io.reactivex:rxjava:1.1.1'
17+
compile 'io.reactivex:rxjava:1.1.2'
1818
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
19-
compile 'io.reactivesocket:reactivesocket:0.0.2'
19+
compile 'io.reactivesocket:reactivesocket:0.0.4'
2020
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
2121
compile 'org.slf4j:slf4j-api:1.7.12'
2222
testCompile 'junit:junit:4.12'
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package io.reactivesocket.aeron.client;
2+
3+
import io.reactivesocket.ConnectionSetupPayload;
4+
import io.reactivesocket.ReactiveSocket;
5+
import io.reactivesocket.ReactiveSocketFactory;
6+
import io.reactivesocket.internal.rx.EmptySubscription;
7+
import io.reactivesocket.rx.Completable;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.Subscriber;
10+
import org.reactivestreams.Subscription;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
import uk.co.real_logic.agrona.LangUtil;
14+
15+
import java.net.*;
16+
import java.util.Enumeration;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.function.Consumer;
20+
21+
/**
22+
* An implementation of {@link ReactiveSocketFactory} that creates Aeron ReactiveSockets.
23+
*/
24+
public class AeronReactiveSocketFactory implements ReactiveSocketFactory {
25+
private static final Logger logger = LoggerFactory.getLogger(AeronReactiveSocketFactory.class);
26+
27+
private final ConnectionSetupPayload connectionSetupPayload;
28+
private final Consumer<Throwable> errorStream;
29+
30+
public AeronReactiveSocketFactory(ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
31+
this(getIPv4InetAddress().getHostAddress(), 39790, connectionSetupPayload, errorStream);
32+
}
33+
34+
public AeronReactiveSocketFactory(String host, int port, ConnectionSetupPayload connectionSetupPayload, Consumer<Throwable> errorStream) {
35+
this.connectionSetupPayload = connectionSetupPayload;
36+
this.errorStream = errorStream;
37+
38+
try {
39+
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
40+
logger.info("Listen to ReactiveSocket Aeron response on host {} port {}", host, port);
41+
AeronClientDuplexConnectionFactory.getInstance().addSocketAddressToHandleResponses(inetSocketAddress);
42+
} catch (Exception e) {
43+
logger.error(e.getMessage(), e);
44+
LangUtil.rethrowUnchecked(e);
45+
}
46+
}
47+
48+
@Override
49+
public Publisher<ReactiveSocket> call(SocketAddress address, long timeout, TimeUnit timeUnit) {
50+
Publisher<AeronClientDuplexConnection> aeronClientDuplexConnection
51+
= AeronClientDuplexConnectionFactory.getInstance().createAeronClientDuplexConnection(address);
52+
53+
return (Subscriber<? super ReactiveSocket> s) -> {
54+
s.onSubscribe(EmptySubscription.INSTANCE);
55+
aeronClientDuplexConnection
56+
.subscribe(new Subscriber<AeronClientDuplexConnection>() {
57+
58+
@Override
59+
public void onSubscribe(Subscription s) {
60+
s.request(1);
61+
}
62+
63+
@Override
64+
public void onNext(AeronClientDuplexConnection connection) {
65+
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
66+
CountDownLatch latch = new CountDownLatch(1);
67+
reactiveSocket.start(new Completable() {
68+
@Override
69+
public void success() {
70+
latch.countDown();
71+
s.onNext(reactiveSocket);
72+
s.onComplete();
73+
}
74+
75+
@Override
76+
public void error(Throwable e) {
77+
s.onError(e);
78+
}
79+
});
80+
81+
try {
82+
latch.await(timeout, timeUnit);
83+
} catch (InterruptedException e) {
84+
logger.error(e.getMessage(), e);
85+
s.onError(e);
86+
}
87+
}
88+
89+
@Override
90+
public void onError(Throwable t) {
91+
s.onError(t);
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
}
97+
});
98+
};
99+
}
100+
101+
private static InetAddress getIPv4InetAddress() {
102+
InetAddress iaddress = null;
103+
try {
104+
String os = System.getProperty("os.name").toLowerCase();
105+
106+
if (os.contains("nix") || os.contains("nux")) {
107+
NetworkInterface ni = NetworkInterface.getByName("eth0");
108+
109+
Enumeration<InetAddress> ias = ni.getInetAddresses();
110+
111+
do {
112+
iaddress = ias.nextElement();
113+
} while (!(iaddress instanceof Inet4Address));
114+
115+
}
116+
117+
iaddress = InetAddress.getLocalHost(); // for Windows and OS X it should work well
118+
} catch (Exception e) {
119+
logger.error(e.getMessage(), e);
120+
LangUtil.rethrowUnchecked(e);
121+
}
122+
123+
return iaddress;
124+
}
125+
}

0 commit comments

Comments
 (0)