Skip to content

Commit 659eaa1

Browse files
committed
fixed code that establishes a connection:
1 parent 713f009 commit 659eaa1

File tree

3 files changed

+31
-12
lines changed

3 files changed

+31
-12
lines changed

src/main/java/io/reactivesocket/aeron/AeronServerDuplexConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,23 +68,26 @@ public Publisher<Void> write(Publisher<Frame> o) {
6868
return RxReactiveStreams.toPublisher(req);
6969
}
7070

71-
void establishConnection() {
71+
void ackEstablishConnection(int ackSessionId) {
7272
final long start = System.nanoTime();
7373
final int sessionId = publication.sessionId();
7474
final BufferClaim bufferClaim = bufferClaims.get();
7575

76+
System.out.print("Acking establish connection for session id => " + ackSessionId);
77+
7678
for (;;) {
7779
final long current = System.nanoTime();
7880
if (current - start > TimeUnit.SECONDS.toNanos(30)) {
7981
throw new RuntimeException("Timed out waiting to establish connection for session id => " + sessionId);
8082
}
8183

82-
final long offer = publication.tryClaim(BitUtil.SIZE_OF_INT, bufferClaim);
84+
final long offer = publication.tryClaim(2 * BitUtil.SIZE_OF_INT, bufferClaim);
8385
if (offer >= 0) {
8486
try {
8587
final MutableDirectBuffer buffer = bufferClaim.buffer();
8688
final int offeset = bufferClaim.offset();
8789
buffer.putInt(offeset, MessageType.ESTABLISH_CONNECTION_RESPONSE.getEncodedType());
90+
buffer.putInt(offeset + BitUtil.SIZE_OF_INT, ackSessionId);
8891
} finally {
8992
bufferClaim.commit();
9093
}

src/main/java/io/reactivesocket/aeron/ReactiveSocketAeronServer.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.nio.ByteBuffer;
21+
import java.util.concurrent.TimeUnit;
2122

2223
import static io.reactivesocket.aeron.Constants.CLIENT_STREAM_ID;
2324
import static io.reactivesocket.aeron.Constants.SERVER_STREAM_ID;
@@ -30,7 +31,7 @@ public class ReactiveSocketAeronServer implements AutoCloseable {
3031

3132
private final int port;
3233

33-
private final Int2ObjectHashMap<AeronServerDuplexConnection> connections;
34+
private volatile Int2ObjectHashMap<AeronServerDuplexConnection> connections;
3435

3536
private final Scheduler.Worker worker;
3637

@@ -78,7 +79,7 @@ void poll(FragmentAssembler fragmentAssembler) {
7879
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
7980
final int sessionId = header.sessionId();
8081

81-
int messageTypeInt = buffer.getInt(0);
82+
int messageTypeInt = buffer.getInt(offset);
8283
MessageType type = MessageType.from(messageTypeInt);
8384

8485
if (MessageType.FRAME == type) {
@@ -88,26 +89,39 @@ void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header)
8889
if (connection != null) {
8990
final PublishSubject<Frame> subject = connection.getSubject();
9091
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
91-
buffer.getBytes(BitUtil.SIZE_OF_INT, bytes, buffer.capacity());
92+
buffer.getBytes(BitUtil.SIZE_OF_INT + offset, bytes, buffer.capacity());
9293
final Frame frame = Frame.from(bytes);
9394
subject.onNext(frame);
9495
}
9596
} else if (MessageType.ESTABLISH_CONNECTION_REQUEST == type) {
96-
AeronServerDuplexConnection connection = connections.get(sessionId);
97-
connection.establishConnection();
97+
final long start = System.nanoTime();
98+
AeronServerDuplexConnection connection = null;
99+
System.out.println("Looking a connection to ack establish connection for session id => " + sessionId);
100+
while (connection == null) {
101+
final long current = System.nanoTime();
102+
103+
if (current - start > TimeUnit.SECONDS.toNanos(30)) {
104+
throw new RuntimeException("unable to find connection to ack establish connection for session id => " + sessionId);
105+
}
106+
107+
connection = connections.get(sessionId);
108+
}
109+
System.out.println("Found a connection to ack establish connection for session id => " + sessionId);
110+
connection.ackEstablishConnection(sessionId);
98111
}
99112

100113
}
101114

102115
void newImageHandler(Image image, String channel, int streamId, int sessionId, long joiningPosition, String sourceIdentity) {
103-
System.out.println(String.format("Handling new image for session id => %d and stream id => %d", streamId, sessionId));
104116
if (SERVER_STREAM_ID == streamId) {
117+
System.out.println(String.format("Handling new image for session id => %d and stream id => %d", streamId, sessionId));
105118
final AeronServerDuplexConnection connection = connections.computeIfAbsent(sessionId, (_s) -> {
106119
final String responseChannel = "udp://" + sourceIdentity.substring(0, sourceIdentity.indexOf(':')) + ":" + port;
107120
Publication publication = aeron.addPublication(responseChannel, CLIENT_STREAM_ID);
108121
System.out.println(String.format("Creating new connection for responseChannel => %s, streamId => %d, and sessionId => %d", responseChannel, streamId, sessionId));
109122
return new AeronServerDuplexConnection(publication);
110123
});
124+
System.out.println("Accepting ReactiveSocket connection");
111125
rsServerProtocol.acceptConnection(connection);
112126
} else {
113127
System.out.println("Unsupported stream id " + streamId);

src/main/java/io/reactivesocket/aeron/ReactivesocketAeronClient.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public static ReactivesocketAeronClient create(String host) {
124124
}
125125

126126
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
127-
int messageTypeInt = buffer.getInt(0);
127+
int messageTypeInt = buffer.getInt(offset);
128128
MessageType messageType = MessageType.from(messageTypeInt);
129129
if (messageType == MessageType.FRAME) {
130130
final PublishSubject<Frame> subject = subjects.get(header.sessionId());
@@ -133,7 +133,9 @@ void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header)
133133
final Frame frame = Frame.from(bytes);
134134
subject.onNext(frame);
135135
} else if (messageType == MessageType.ESTABLISH_CONNECTION_RESPONSE) {
136-
CountDownLatch latch = establishConnectionLatches.get(header.sessionId());
136+
int ackSessionId = buffer.getInt(offset + BitUtil.SIZE_OF_INT);
137+
System.out.println(String.format("Received establish connection ack for session id => %d", ackSessionId));
138+
CountDownLatch latch = establishConnectionLatches.get(ackSessionId);
137139
latch.countDown();
138140
} else {
139141
System.out.println("Unknow message type => " + messageTypeInt);
@@ -150,7 +152,7 @@ void poll(FragmentAssembler fragmentAssembler, Subscription subscription, Schedu
150152
}
151153

152154
/**
153-
* Establishes a connection between the client and server. Waits for 30 seconds before throwing a exception.
155+
* Establishes a connection between the client and server. Waits for 39 seconds before throwing a exception.
154156
*/
155157
void establishConnection(final Publication publication, final int sessionId) {
156158
try {
@@ -173,7 +175,7 @@ void establishConnection(final Publication publication, final int sessionId) {
173175
offer = publication.offer(buffer);
174176
}
175177

176-
if (latch.getCount() > 0) {
178+
if (latch.getCount() == 0) {
177179
break;
178180
}
179181
}

0 commit comments

Comments
 (0)