Skip to content

Commit 713f009

Browse files
committed
added code to wait until a connection is established
1 parent d8f4bf9 commit 713f009

File tree

6 files changed

+251
-44
lines changed

6 files changed

+251
-44
lines changed

src/main/java/io/reactivesocket/aeron/AeronServerDuplexConnection.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,23 @@
88
import rx.subjects.PublishSubject;
99
import uk.co.real_logic.aeron.Publication;
1010
import uk.co.real_logic.aeron.logbuffer.BufferClaim;
11+
import uk.co.real_logic.agrona.BitUtil;
1112
import uk.co.real_logic.agrona.MutableDirectBuffer;
1213

1314
import java.nio.ByteBuffer;
15+
import java.util.concurrent.TimeUnit;
1416

15-
public class AeronServerDuplexConnection implements DuplexConnection {
16-
private static final byte[] EMTPY = new byte[0];
17+
public class AeronServerDuplexConnection implements DuplexConnection, AutoCloseable {
1718

1819
private static final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
1920

2021
private Publication publication;
2122
private PublishSubject<Frame> subject;
2223

23-
private int aeronStreamId;
24-
private int aeronSessionId;
25-
2624
public AeronServerDuplexConnection(
27-
Publication publication,
28-
int aeronStreamId,
29-
int aeronSessionId) {
25+
Publication publication) {
3026
this.publication = publication;
3127
this.subject = PublishSubject.create();
32-
this.aeronStreamId = aeronStreamId;
33-
this.aeronSessionId = aeronSessionId;
3428
}
3529

3630
PublishSubject<Frame> getSubject() {
@@ -47,15 +41,16 @@ public Publisher<Void> write(Publisher<Frame> o) {
4741
.toObservable(o)
4842
.map(frame -> {
4943
final ByteBuffer byteBuffer = frame.getByteBuffer();
50-
44+
final int length = byteBuffer.capacity() + BitUtil.SIZE_OF_INT;
5145
for (;;) {
5246
final BufferClaim bufferClaim = bufferClaims.get();
53-
final long offer = publication.tryClaim(byteBuffer.capacity(), bufferClaim);
47+
final long offer = publication.tryClaim(length, bufferClaim);
5448
if (offer >= 0) {
5549
try {
5650
final MutableDirectBuffer buffer = bufferClaim.buffer();
5751
final int offset = bufferClaim.offset();
58-
buffer.putBytes(offset, byteBuffer, 0, byteBuffer.capacity());
52+
buffer.putInt(offset, MessageType.FRAME.getEncodedType());
53+
buffer.putBytes(offset + BitUtil.SIZE_OF_INT, byteBuffer, 0, byteBuffer.capacity());
5954
} finally {
6055
bufferClaim.commit();
6156
}
@@ -72,4 +67,37 @@ public Publisher<Void> write(Publisher<Frame> o) {
7267

7368
return RxReactiveStreams.toPublisher(req);
7469
}
70+
71+
void establishConnection() {
72+
final long start = System.nanoTime();
73+
final int sessionId = publication.sessionId();
74+
final BufferClaim bufferClaim = bufferClaims.get();
75+
76+
for (;;) {
77+
final long current = System.nanoTime();
78+
if (current - start > TimeUnit.SECONDS.toNanos(30)) {
79+
throw new RuntimeException("Timed out waiting to establish connection for session id => " + sessionId);
80+
}
81+
82+
final long offer = publication.tryClaim(BitUtil.SIZE_OF_INT, bufferClaim);
83+
if (offer >= 0) {
84+
try {
85+
final MutableDirectBuffer buffer = bufferClaim.buffer();
86+
final int offeset = bufferClaim.offset();
87+
buffer.putInt(offeset, MessageType.ESTABLISH_CONNECTION_RESPONSE.getEncodedType());
88+
} finally {
89+
bufferClaim.commit();
90+
}
91+
92+
break;
93+
}
94+
95+
}
96+
}
97+
98+
@Override
99+
public void close() throws Exception {
100+
subject.onCompleted();
101+
publication.close();
102+
}
75103
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.reactivesocket.aeron;
2+
3+
public final class Constants {
4+
5+
private Constants() {}
6+
7+
public static final int SERVER_STREAM_ID = 1;
8+
9+
public static final int CLIENT_STREAM_ID = 2;
10+
11+
public static final byte[] EMTPY = new byte[0];
12+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.reactivesocket.aeron;
2+
3+
/**
4+
* Type of message being sent.
5+
*/
6+
enum MessageType {
7+
ESTABLISH_CONNECTION_REQUEST(0x01),
8+
ESTABLISH_CONNECTION_RESPONSE(0x02),
9+
FRAME(0x03);
10+
11+
private static MessageType[] typesById;
12+
13+
/**
14+
* Index types by id for indexed lookup.
15+
*/
16+
static {
17+
int max = 0;
18+
19+
for (MessageType t : values()) {
20+
max = Math.max(t.id, max);
21+
}
22+
23+
typesById = new MessageType[max + 1];
24+
25+
for (MessageType t : values()) {
26+
typesById[t.id] = t;
27+
}
28+
}
29+
30+
private final int id;
31+
32+
MessageType(int id) {
33+
this.id = id;
34+
}
35+
36+
public int getEncodedType() {
37+
return id;
38+
}
39+
40+
public static MessageType from(int id) {
41+
return typesById[id];
42+
}
43+
}

src/main/java/io/reactivesocket/aeron/ReactiveSocketAeronServer.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,22 @@
1212
import uk.co.real_logic.aeron.Publication;
1313
import uk.co.real_logic.aeron.Subscription;
1414
import uk.co.real_logic.aeron.logbuffer.Header;
15+
import uk.co.real_logic.agrona.BitUtil;
1516
import uk.co.real_logic.agrona.DirectBuffer;
1617
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
1718

18-
import java.io.Closeable;
1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
2121

22-
public class ReactiveSocketAeronServer implements Closeable {
22+
import static io.reactivesocket.aeron.Constants.CLIENT_STREAM_ID;
23+
import static io.reactivesocket.aeron.Constants.SERVER_STREAM_ID;
24+
25+
public class ReactiveSocketAeronServer implements AutoCloseable {
2326

2427
private final ReactiveSocketServerProtocol rsServerProtocol;
2528

2629
private final Aeron aeron;
2730

28-
private final int SERVER_STREAM_ID = 1;
29-
30-
private final int CLIENT_STREAM_ID = 2;
31-
3231
private final int port;
3332

3433
private final Int2ObjectHashMap<AeronServerDuplexConnection> connections;
@@ -78,33 +77,44 @@ void poll(FragmentAssembler fragmentAssembler) {
7877

7978
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
8079
final int sessionId = header.sessionId();
81-
AeronServerDuplexConnection connection = connections.get(sessionId);
82-
83-
if (connection != null) {
84-
final PublishSubject<Frame> subject = connection.getSubject();
85-
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
86-
buffer.getBytes(0, bytes, buffer.capacity());
87-
final Frame frame = Frame.from(bytes);
88-
subject.onNext(frame);
89-
} else {
90-
System.out.println("No connection found for session id " + sessionId);
80+
81+
int messageTypeInt = buffer.getInt(0);
82+
MessageType type = MessageType.from(messageTypeInt);
83+
84+
if (MessageType.FRAME == type) {
85+
86+
AeronServerDuplexConnection connection = connections.get(sessionId);
87+
88+
if (connection != null) {
89+
final PublishSubject<Frame> subject = connection.getSubject();
90+
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
91+
buffer.getBytes(BitUtil.SIZE_OF_INT, bytes, buffer.capacity());
92+
final Frame frame = Frame.from(bytes);
93+
subject.onNext(frame);
94+
}
95+
} else if (MessageType.ESTABLISH_CONNECTION_REQUEST == type) {
96+
AeronServerDuplexConnection connection = connections.get(sessionId);
97+
connection.establishConnection();
9198
}
99+
92100
}
93101

94102
void newImageHandler(Image image, String channel, int streamId, int sessionId, long joiningPosition, String sourceIdentity) {
103+
System.out.println(String.format("Handling new image for session id => %d and stream id => %d", streamId, sessionId));
95104
if (SERVER_STREAM_ID == streamId) {
96-
97105
final AeronServerDuplexConnection connection = connections.computeIfAbsent(sessionId, (_s) -> {
98106
final String responseChannel = "udp://" + sourceIdentity.substring(0, sourceIdentity.indexOf(':')) + ":" + port;
99107
Publication publication = aeron.addPublication(responseChannel, CLIENT_STREAM_ID);
100-
return new AeronServerDuplexConnection(publication, streamId, sessionId);
108+
System.out.println(String.format("Creating new connection for responseChannel => %s, streamId => %d, and sessionId => %d", responseChannel, streamId, sessionId));
109+
return new AeronServerDuplexConnection(publication);
101110
});
102111
rsServerProtocol.acceptConnection(connection);
103112
} else {
104113
System.out.println("Unsupported stream id " + streamId);
105114
}
106115
}
107116

117+
108118
@Override
109119
public void close() throws IOException {
110120
running = false;

src/main/java/io/reactivesocket/aeron/ReactivesocketAeronClient.java

Lines changed: 72 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,30 @@
1414
import uk.co.real_logic.aeron.Publication;
1515
import uk.co.real_logic.aeron.Subscription;
1616
import uk.co.real_logic.aeron.logbuffer.Header;
17+
import uk.co.real_logic.agrona.BitUtil;
1718
import uk.co.real_logic.agrona.DirectBuffer;
1819
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
1920
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
2021

2122
import java.nio.ByteBuffer;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import static io.reactivesocket.aeron.Constants.CLIENT_STREAM_ID;
27+
import static io.reactivesocket.aeron.Constants.EMTPY;
28+
import static io.reactivesocket.aeron.Constants.SERVER_STREAM_ID;
2229

2330
/**
2431
* Created by rroeser on 8/13/15.
2532
*/
2633
public class ReactivesocketAeronClient {
27-
private static final byte[] EMTPY = new byte[0];
28-
2934
private static final ThreadLocal<UnsafeBuffer> buffers = ThreadLocal.withInitial(() -> new UnsafeBuffer(EMTPY));
3035

3136
private static final Int2ObjectHashMap<Subscription> subscriptions = new Int2ObjectHashMap<>();
3237

3338
private static final Int2ObjectHashMap<PublishSubject<Frame>> subjects = new Int2ObjectHashMap<>();
3439

35-
private static final int SERVER_STREAM_ID = 1;
36-
37-
private static final int CLIENT_STREAM_ID = 2;
40+
private static final Int2ObjectHashMap<CountDownLatch> establishConnectionLatches = new Int2ObjectHashMap<>();
3841

3942
private final ReactiveSocketClientProtocol rsClientProtocol;
4043

@@ -52,6 +55,8 @@ private ReactivesocketAeronClient(String host, int port) {
5255

5356
final String channel = "udp://" + host + ":" + port;
5457

58+
System.out.println("Creating a publication to channel => " + channel);
59+
5560
final Publication publication = aeron.addPublication(channel, SERVER_STREAM_ID);
5661

5762
final int sessionId = publication.sessionId();
@@ -68,11 +73,13 @@ private ReactivesocketAeronClient(String host, int port) {
6873
return subscription;
6974
});
7075

76+
establishConnection(publication, sessionId);
77+
7178
this.rsClientProtocol =
7279
ReactiveSocketClientProtocol.create(new DuplexConnection() {
7380

7481
public Publisher<Frame> getInput() {
75-
PublishSubject publishSubject = subjects.get(port);
82+
PublishSubject publishSubject = subjects.get(sessionId);
7683
return RxReactiveStreams.toPublisher(publishSubject);
7784
}
7885

@@ -81,9 +88,14 @@ public Publisher<Void> write(Publisher<Frame> o) {
8188
Observable<Void> req = RxReactiveStreams
8289
.toObservable(o)
8390
.map(frame -> {
91+
final ByteBuffer frameBuffer = frame.getByteBuffer();
92+
final int frameBufferLength = frameBuffer.capacity();
8493
final UnsafeBuffer buffer = buffers.get();
85-
ByteBuffer byteBuffer = frame.getByteBuffer();
86-
buffer.wrap(byteBuffer);
94+
final byte[] bytes = new byte[frameBufferLength + BitUtil.SIZE_OF_INT];
95+
96+
buffer.wrap(bytes);
97+
buffer.putInt(0, MessageType.FRAME.getEncodedType());
98+
buffer.putBytes(BitUtil.SIZE_OF_INT, frameBuffer, frameBufferLength);
8799

88100
for (;;) {
89101
final long offer = publication.offer(buffer);
@@ -112,11 +124,20 @@ public static ReactivesocketAeronClient create(String host) {
112124
}
113125

114126
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
115-
final PublishSubject<Frame> subject = subjects.get(header.sessionId());
116-
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
117-
buffer.getBytes(0, bytes, buffer.capacity());
118-
final Frame frame = Frame.from(bytes);
119-
subject.onNext(frame);
127+
int messageTypeInt = buffer.getInt(0);
128+
MessageType messageType = MessageType.from(messageTypeInt);
129+
if (messageType == MessageType.FRAME) {
130+
final PublishSubject<Frame> subject = subjects.get(header.sessionId());
131+
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
132+
buffer.getBytes(BitUtil.SIZE_OF_INT, bytes, buffer.capacity());
133+
final Frame frame = Frame.from(bytes);
134+
subject.onNext(frame);
135+
} else if (messageType == MessageType.ESTABLISH_CONNECTION_RESPONSE) {
136+
CountDownLatch latch = establishConnectionLatches.get(header.sessionId());
137+
latch.countDown();
138+
} else {
139+
System.out.println("Unknow message type => " + messageTypeInt);
140+
}
120141
}
121142

122143
void poll(FragmentAssembler fragmentAssembler, Subscription subscription, Scheduler.Worker worker) {
@@ -128,6 +149,44 @@ void poll(FragmentAssembler fragmentAssembler, Subscription subscription, Schedu
128149
}
129150
}
130151

152+
/**
153+
* Establishes a connection between the client and server. Waits for 30 seconds before throwing a exception.
154+
*/
155+
void establishConnection(final Publication publication, final int sessionId) {
156+
try {
157+
UnsafeBuffer buffer = buffers.get();
158+
buffer.wrap(new byte[BitUtil.SIZE_OF_INT]);
159+
buffer.putInt(0, MessageType.ESTABLISH_CONNECTION_REQUEST.getEncodedType());
160+
161+
CountDownLatch latch = new CountDownLatch(1);
162+
establishConnectionLatches.put(sessionId, latch);
163+
164+
long offer = -1;
165+
final long start = System.nanoTime();
166+
for (;;) {
167+
final long current = System.nanoTime();
168+
if (current - start > TimeUnit.SECONDS.toNanos(30)) {
169+
throw new RuntimeException("Timed out waiting to establish connection for session id => " + sessionId);
170+
}
171+
172+
if (offer < 0) {
173+
offer = publication.offer(buffer);
174+
}
175+
176+
if (latch.getCount() > 0) {
177+
break;
178+
}
179+
}
180+
181+
System.out.println(String.format("Connection established for channel => %s, stream id => %d",
182+
publication.channel(),
183+
publication.sessionId()));
184+
} finally {
185+
establishConnectionLatches.remove(sessionId);
186+
}
187+
188+
}
189+
131190
public Publisher<String> requestResponse(String payload) {
132191
return rsClientProtocol.requestResponse(payload);
133192
}

0 commit comments

Comments
 (0)