Skip to content

Commit e9869f2

Browse files
committed
receives ping sends pong
1 parent 659eaa1 commit e9869f2

File tree

4 files changed

+125
-69
lines changed

4 files changed

+125
-69
lines changed

src/main/java/io/reactivesocket/aeron/AeronServerDuplexConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public Publisher<Frame> getInput() {
3939
public Publisher<Void> write(Publisher<Frame> o) {
4040
Observable<Void> req = RxReactiveStreams
4141
.toObservable(o)
42-
.map(frame -> {
42+
.flatMap(frame -> {
4343
final ByteBuffer byteBuffer = frame.getByteBuffer();
4444
final int length = byteBuffer.capacity() + BitUtil.SIZE_OF_INT;
4545
for (;;) {
@@ -57,12 +57,12 @@ public Publisher<Void> write(Publisher<Frame> o) {
5757

5858
break;
5959
} else if (Publication.NOT_CONNECTED == offer) {
60-
throw new RuntimeException("not connected");
60+
return Observable.error(new RuntimeException("not connected"));
6161
}
6262

6363
}
6464

65-
return null;
65+
return Observable.empty();
6666
});
6767

6868
return RxReactiveStreams.toPublisher(req);
@@ -73,7 +73,7 @@ void ackEstablishConnection(int ackSessionId) {
7373
final int sessionId = publication.sessionId();
7474
final BufferClaim bufferClaim = bufferClaims.get();
7575

76-
System.out.print("Acking establish connection for session id => " + ackSessionId);
76+
System.out.println("Acking establish connection for session id => " + ackSessionId);
7777

7878
for (;;) {
7979
final long current = System.nanoTime();

src/main/java/io/reactivesocket/aeron/ReactiveSocketAeronServer.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.reactivesocket.aeron;
22

33
import io.reactivesocket.Frame;
4-
import io.reactivesocket.ReactiveSocketServerProtocol;
4+
import io.reactivesocket.ReactiveSocket;
55
import io.reactivesocket.RequestHandler;
66
import rx.Scheduler;
77
import rx.schedulers.Schedulers;
@@ -25,8 +25,6 @@
2525

2626
public class ReactiveSocketAeronServer implements AutoCloseable {
2727

28-
private final ReactiveSocketServerProtocol rsServerProtocol;
29-
3028
private final Aeron aeron;
3129

3230
private final int port;
@@ -39,9 +37,34 @@ public class ReactiveSocketAeronServer implements AutoCloseable {
3937

4038
private volatile boolean running = true;
4139

40+
private final RequestHandler requestHandler;
41+
42+
private static final org.reactivestreams.Subscriber<Void> PROTOCOL_SUBSCRIBER = new org.reactivestreams.Subscriber<Void>() {
43+
@Override
44+
public void onSubscribe(org.reactivestreams.Subscription s) {
45+
s.request(Long.MAX_VALUE);
46+
}
47+
48+
@Override
49+
public void onNext(Void t) {
50+
51+
}
52+
53+
@Override
54+
public void onError(Throwable t) {
55+
t.printStackTrace();
56+
}
57+
58+
@Override
59+
public void onComplete() {
60+
61+
}
62+
};
63+
4264
private ReactiveSocketAeronServer(int port, RequestHandler requestHandler) {
4365
this.port = port;
4466
this.connections = new Int2ObjectHashMap<>();
67+
this.requestHandler = requestHandler;
4568

4669
final Aeron.Context ctx = new Aeron.Context();
4770
ctx.newImageHandler(this::newImageHandler);
@@ -56,7 +79,6 @@ private ReactiveSocketAeronServer(int port, RequestHandler requestHandler) {
5679

5780
poll(fragmentAssembler);
5881

59-
rsServerProtocol = ReactiveSocketServerProtocol.create(requestHandler);
6082
}
6183

6284
public static ReactiveSocketAeronServer create(int port, RequestHandler requestHandler) {
@@ -88,8 +110,8 @@ void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header)
88110

89111
if (connection != null) {
90112
final PublishSubject<Frame> subject = connection.getSubject();
91-
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
92-
buffer.getBytes(BitUtil.SIZE_OF_INT + offset, bytes, buffer.capacity());
113+
ByteBuffer bytes = ByteBuffer.allocate(length);
114+
buffer.getBytes(BitUtil.SIZE_OF_INT + offset, bytes, length);
93115
final Frame frame = Frame.from(bytes);
94116
subject.onNext(frame);
95117
}
@@ -122,17 +144,18 @@ void newImageHandler(Image image, String channel, int streamId, int sessionId, l
122144
return new AeronServerDuplexConnection(publication);
123145
});
124146
System.out.println("Accepting ReactiveSocket connection");
125-
rsServerProtocol.acceptConnection(connection);
147+
ReactiveSocket socket = ReactiveSocket.accept(connection, requestHandler);
148+
socket.responderPublisher().subscribe(PROTOCOL_SUBSCRIBER);
126149
} else {
127150
System.out.println("Unsupported stream id " + streamId);
128151
}
129152
}
130153

131-
132154
@Override
133155
public void close() throws IOException {
134156
running = false;
135157
worker.unsubscribe();
136158
aeron.close();
137159
}
160+
138161
}

src/main/java/io/reactivesocket/aeron/ReactivesocketAeronClient.java

Lines changed: 61 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import io.reactivesocket.DuplexConnection;
44
import io.reactivesocket.Frame;
5-
import io.reactivesocket.ReactiveSocketClientProtocol;
5+
import io.reactivesocket.ReactiveSocket;
66
import org.reactivestreams.Publisher;
77
import rx.Observable;
88
import rx.RxReactiveStreams;
@@ -39,10 +39,12 @@ public class ReactivesocketAeronClient {
3939

4040
private static final Int2ObjectHashMap<CountDownLatch> establishConnectionLatches = new Int2ObjectHashMap<>();
4141

42-
private final ReactiveSocketClientProtocol rsClientProtocol;
42+
private ReactiveSocket rsClientProtocol;
4343

4444
private final Aeron aeron;
4545

46+
private final Publication publication;
47+
4648
private volatile boolean running = true;
4749

4850
private final int port;
@@ -57,12 +59,8 @@ private ReactivesocketAeronClient(String host, int port) {
5759

5860
System.out.println("Creating a publication to channel => " + channel);
5961

60-
final Publication publication = aeron.addPublication(channel, SERVER_STREAM_ID);
61-
62+
publication = aeron.addPublication(channel, SERVER_STREAM_ID);
6263
final int sessionId = publication.sessionId();
63-
64-
subjects.computeIfAbsent(sessionId, (_p) -> PublishSubject.create());
65-
6664
subscriptions.computeIfAbsent(port, (_p) -> {
6765
Subscription subscription = aeron.addSubscription(channel, CLIENT_STREAM_ID);
6866

@@ -75,44 +73,6 @@ private ReactivesocketAeronClient(String host, int port) {
7573

7674
establishConnection(publication, sessionId);
7775

78-
this.rsClientProtocol =
79-
ReactiveSocketClientProtocol.create(new DuplexConnection() {
80-
81-
public Publisher<Frame> getInput() {
82-
PublishSubject publishSubject = subjects.get(sessionId);
83-
return RxReactiveStreams.toPublisher(publishSubject);
84-
}
85-
86-
@Override
87-
public Publisher<Void> write(Publisher<Frame> o) {
88-
Observable<Void> req = RxReactiveStreams
89-
.toObservable(o)
90-
.map(frame -> {
91-
final ByteBuffer frameBuffer = frame.getByteBuffer();
92-
final int frameBufferLength = frameBuffer.capacity();
93-
final UnsafeBuffer buffer = buffers.get();
94-
final byte[] bytes = new byte[frameBufferLength + BitUtil.SIZE_OF_INT];
95-
96-
buffer.wrap(bytes);
97-
buffer.putInt(0, MessageType.FRAME.getEncodedType());
98-
buffer.putBytes(BitUtil.SIZE_OF_INT, frameBuffer, frameBufferLength);
99-
100-
for (;;) {
101-
final long offer = publication.offer(buffer);
102-
103-
if (offer >= 0) {
104-
break;
105-
} else if (Publication.NOT_CONNECTED == offer) {
106-
throw new RuntimeException("not connected");
107-
}
108-
}
109-
110-
return null;
111-
});
112-
113-
return RxReactiveStreams.toPublisher(req);
114-
}
115-
});
11676
}
11777

11878
public static ReactivesocketAeronClient create(String host, int port) {
@@ -128,15 +88,58 @@ void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header)
12888
MessageType messageType = MessageType.from(messageTypeInt);
12989
if (messageType == MessageType.FRAME) {
13090
final PublishSubject<Frame> subject = subjects.get(header.sessionId());
131-
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
132-
buffer.getBytes(BitUtil.SIZE_OF_INT, bytes, buffer.capacity());
91+
ByteBuffer bytes = ByteBuffer.allocate(length);
92+
buffer.getBytes(BitUtil.SIZE_OF_INT + offset, bytes, length);
13393
final Frame frame = Frame.from(bytes);
13494
subject.onNext(frame);
13595
} else if (messageType == MessageType.ESTABLISH_CONNECTION_RESPONSE) {
13696
int ackSessionId = buffer.getInt(offset + BitUtil.SIZE_OF_INT);
13797
System.out.println(String.format("Received establish connection ack for session id => %d", ackSessionId));
98+
99+
subjects.computeIfAbsent(header.sessionId(), (_p) -> PublishSubject.create());
100+
101+
this.rsClientProtocol =
102+
ReactiveSocket.connect(new DuplexConnection() {
103+
104+
public Publisher<Frame> getInput() {
105+
PublishSubject publishSubject = subjects.get(header.sessionId());
106+
return RxReactiveStreams.toPublisher(publishSubject);
107+
}
108+
109+
@Override
110+
public Publisher<Void> write(Publisher<Frame> o) {
111+
Observable<Void> req = RxReactiveStreams
112+
.toObservable(o)
113+
.flatMap(frame -> {
114+
final ByteBuffer frameBuffer = frame.getByteBuffer();
115+
final int frameBufferLength = frameBuffer.capacity();
116+
final UnsafeBuffer buffer = buffers.get();
117+
final byte[] bytes = new byte[frameBufferLength + BitUtil.SIZE_OF_INT];
118+
119+
buffer.wrap(bytes);
120+
buffer.putInt(0, MessageType.FRAME.getEncodedType());
121+
buffer.putBytes(BitUtil.SIZE_OF_INT, frameBuffer, frameBufferLength);
122+
123+
for (; ; ) {
124+
final long offer = publication.offer(buffer);
125+
126+
if (offer >= 0) {
127+
break;
128+
} else if (Publication.NOT_CONNECTED == offer) {
129+
Observable.error(new RuntimeException("not connected"));
130+
}
131+
}
132+
133+
return Observable.empty();
134+
});
135+
136+
return RxReactiveStreams.toPublisher(req);
137+
}
138+
});
139+
138140
CountDownLatch latch = establishConnectionLatches.get(ackSessionId);
139141
latch.countDown();
142+
140143
} else {
141144
System.out.println("Unknow message type => " + messageTypeInt);
142145
}
@@ -189,20 +192,23 @@ void establishConnection(final Publication publication, final int sessionId) {
189192

190193
}
191194

192-
public Publisher<String> requestResponse(String payload) {
193-
return rsClientProtocol.requestResponse(payload);
195+
public Publisher<String> requestResponse(String data, String metadata) {
196+
return rsClientProtocol.requestResponse(data, metadata);
194197
}
195198

196-
public Publisher<String> requestStream(String payload) {
197-
return rsClientProtocol.requestStream(payload);
199+
public Publisher<Void> fireAndForget(String data, String metadata) {
200+
return rsClientProtocol.fireAndForget(data, metadata);
198201
}
199202

200-
public Publisher<Void> fireAndForget(String payload) {
201-
return rsClientProtocol.fireAndForget(payload);
203+
public Publisher<String> requestStream(String data, String metadata) {
204+
return rsClientProtocol.requestStream(data, metadata);
202205
}
203206

204-
public Publisher<String> requestSubscription(String payload) {
205-
return rsClientProtocol.requestSubscription(payload);
207+
public Publisher<String> requestSubscription(String data, String metadata) {
208+
return rsClientProtocol.requestSubscription(data, metadata);
206209
}
207210

211+
public Publisher<Void> responderPublisher() {
212+
return rsClientProtocol.responderPublisher();
213+
}
208214
}

src/test/java/io/reactivesocket/aeron/ReactiveSocketAeronTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
import io.reactivesocket.RequestHandler;
44
import org.junit.Test;
55
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Subscription;
68
import rx.Observable;
79
import rx.RxReactiveStreams;
810
import uk.co.real_logic.aeron.driver.MediaDriver;
911

12+
import java.util.concurrent.CountDownLatch;
13+
1014
/**
1115
* Created by rroeser on 8/14/15.
1216
*/
@@ -45,10 +49,33 @@ public Publisher<Void> handleFireAndForget(String request) {
4549
}
4650
});
4751

52+
CountDownLatch latch = new CountDownLatch(1);
53+
4854
ReactivesocketAeronClient client = ReactivesocketAeronClient.create("localhost");
49-
//Publisher<String> ping = client.requestResponse("ping");
50-
//RxReactiveStreams.toObservable(ping).doOnError(Throwable::printStackTrace).forEach(a -> System.out.println("pong from the server => " + a));
55+
client.requestResponse("ping", "ping metadata").subscribe(new Subscriber<String>() {
56+
@Override
57+
public void onSubscribe(Subscription s) {
58+
s.request(Long.MAX_VALUE);
59+
System.out.println("here we go");
60+
}
61+
62+
@Override
63+
public void onNext(String s) {
64+
System.out.println(s);
65+
}
66+
67+
@Override
68+
public void onError(Throwable t) {
69+
t.printStackTrace();
70+
}
71+
72+
@Override
73+
public void onComplete() {
74+
latch.countDown();
75+
}
76+
});
5177

78+
latch.await();
5279
}
5380

5481

0 commit comments

Comments
 (0)