Skip to content

Commit c97ca50

Browse files
committed
Merge pull request #1 from ReactiveSocket/master
pulling from master
2 parents e9869f2 + 64cb361 commit c97ca50

File tree

13 files changed

+631
-129
lines changed

13 files changed

+631
-129
lines changed

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# ReactiveSocket over Aeron
2+
3+
This is an implementation over Aeron.
4+
5+
## Master Build Status
6+
7+
<a href='https://travis-ci.org/ReactiveSocket/reactivesocket-aeron-rxjava/builds'><img src='https://travis-ci.org/ReactiveSocket/reactivesocket-aeron-rxjava.svg?branch=master'></a>
8+
9+
## Bugs and Feedback
10+
11+
For bugs, questions and discussions please use the [Github Issues](https://github.com/ReactiveSocket/reactivesocket-aeron-rxjava/issues).
12+
13+
14+
## LICENSE
15+
16+
Copyright 2015 Netflix, Inc.
17+
18+
Licensed under the Apache License, Version 2.0 (the "License");
19+
you may not use this file except in compliance with the License.
20+
You may obtain a copy of the License at
21+
22+
<http://www.apache.org/licenses/LICENSE-2.0>
23+
24+
Unless required by applicable law or agreed to in writing, software
25+
distributed under the License is distributed on an "AS IS" BASIS,
26+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27+
See the License for the specific language governing permissions and
28+
limitations under the License.

gradle/wrapper/gradle-wrapper.jar

49.8 KB
Binary file not shown.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.reactivesocket.aeron;
2+
3+
import io.reactivesocket.DuplexConnection;
4+
import io.reactivesocket.Frame;
5+
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
import rx.Observable;
8+
import rx.RxReactiveStreams;
9+
import uk.co.real_logic.aeron.Publication;
10+
11+
public class AeronClientDuplexConnection implements DuplexConnection, AutoCloseable {
12+
private Publication publication;
13+
private Subscriber<? super Frame> subscriber;
14+
private Publisher<Frame> publisher;
15+
16+
public AeronClientDuplexConnection(Publication publication) {
17+
this.publication = publication;
18+
this.publisher = (Subscriber<? super Frame> s) -> subscriber = s;
19+
}
20+
21+
public Subscriber<? super Frame> getSubscriber() {
22+
return subscriber;
23+
}
24+
25+
public Publisher<Frame> getInput() {
26+
return publisher;
27+
}
28+
29+
@Override
30+
public Publisher<Void> write(Publisher<Frame> o) {
31+
final Observable<Frame> frameObservable = RxReactiveStreams.toObservable(o);
32+
final Observable<Void> voidObservable = frameObservable
33+
.lift(new OperatorPublish(publication));
34+
35+
return RxReactiveStreams.toPublisher(voidObservable);
36+
}
37+
38+
@Override
39+
public void close() throws Exception {
40+
subscriber.onComplete();
41+
publication.close();
42+
}
43+
}
44+

src/main/java/io/reactivesocket/aeron/AeronServerDuplexConnection.java

Lines changed: 15 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,69 +3,45 @@
33
import io.reactivesocket.DuplexConnection;
44
import io.reactivesocket.Frame;
55
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
67
import rx.Observable;
78
import rx.RxReactiveStreams;
8-
import rx.subjects.PublishSubject;
99
import uk.co.real_logic.aeron.Publication;
1010
import uk.co.real_logic.aeron.logbuffer.BufferClaim;
1111
import uk.co.real_logic.agrona.BitUtil;
1212
import uk.co.real_logic.agrona.MutableDirectBuffer;
1313

14-
import java.nio.ByteBuffer;
1514
import java.util.concurrent.TimeUnit;
1615

1716
public class AeronServerDuplexConnection implements DuplexConnection, AutoCloseable {
1817

1918
private static final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
2019

2120
private Publication publication;
22-
private PublishSubject<Frame> subject;
21+
private Subscriber<? super Frame> subscriber;
22+
private Publisher<Frame> publisher;
2323

2424
public AeronServerDuplexConnection(
2525
Publication publication) {
2626
this.publication = publication;
27-
this.subject = PublishSubject.create();
27+
this.publisher = (Subscriber<? super Frame> s) -> subscriber = s;
2828
}
2929

30-
PublishSubject<Frame> getSubject() {
31-
return subject;
30+
public Subscriber<? super Frame> getSubscriber() {
31+
return subscriber;
3232
}
3333

3434
@Override
3535
public Publisher<Frame> getInput() {
36-
return RxReactiveStreams.toPublisher(subject);
36+
return publisher;
3737
}
3838

3939
public Publisher<Void> write(Publisher<Frame> o) {
40-
Observable<Void> req = RxReactiveStreams
41-
.toObservable(o)
42-
.flatMap(frame -> {
43-
final ByteBuffer byteBuffer = frame.getByteBuffer();
44-
final int length = byteBuffer.capacity() + BitUtil.SIZE_OF_INT;
45-
for (;;) {
46-
final BufferClaim bufferClaim = bufferClaims.get();
47-
final long offer = publication.tryClaim(length, bufferClaim);
48-
if (offer >= 0) {
49-
try {
50-
final MutableDirectBuffer buffer = bufferClaim.buffer();
51-
final int offset = bufferClaim.offset();
52-
buffer.putInt(offset, MessageType.FRAME.getEncodedType());
53-
buffer.putBytes(offset + BitUtil.SIZE_OF_INT, byteBuffer, 0, byteBuffer.capacity());
54-
} finally {
55-
bufferClaim.commit();
56-
}
57-
58-
break;
59-
} else if (Publication.NOT_CONNECTED == offer) {
60-
return Observable.error(new RuntimeException("not connected"));
61-
}
40+
final Observable<Frame> frameObservable = RxReactiveStreams.toObservable(o);
41+
final Observable<Void> voidObservable = frameObservable
42+
.lift(new OperatorPublish(publication));
6243

63-
}
64-
65-
return Observable.empty();
66-
});
67-
68-
return RxReactiveStreams.toPublisher(req);
44+
return RxReactiveStreams.toPublisher(voidObservable);
6945
}
7046

7147
void ackEstablishConnection(int ackSessionId) {
@@ -85,9 +61,9 @@ void ackEstablishConnection(int ackSessionId) {
8561
if (offer >= 0) {
8662
try {
8763
final MutableDirectBuffer buffer = bufferClaim.buffer();
88-
final int offeset = bufferClaim.offset();
89-
buffer.putInt(offeset, MessageType.ESTABLISH_CONNECTION_RESPONSE.getEncodedType());
90-
buffer.putInt(offeset + BitUtil.SIZE_OF_INT, ackSessionId);
64+
final int offset = bufferClaim.offset();
65+
buffer.putInt(offset, MessageType.ESTABLISH_CONNECTION_RESPONSE.getEncodedType());
66+
buffer.putInt(offset + BitUtil.SIZE_OF_INT, ackSessionId);
9167
} finally {
9268
bufferClaim.commit();
9369
}
@@ -100,7 +76,7 @@ void ackEstablishConnection(int ackSessionId) {
10076

10177
@Override
10278
public void close() throws Exception {
103-
subject.onCompleted();
79+
subscriber.onComplete();
10480
publication.close();
10581
}
10682
}

src/main/java/io/reactivesocket/aeron/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.reactivesocket.aeron;
22

3-
public final class Constants {
3+
final class Constants {
44

55
private Constants() {}
66

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package io.reactivesocket.aeron;
2+
3+
import io.reactivesocket.Frame;
4+
import rx.Observable;
5+
import uk.co.real_logic.aeron.Publication;
6+
import uk.co.real_logic.aeron.logbuffer.BufferClaim;
7+
import uk.co.real_logic.agrona.BitUtil;
8+
import uk.co.real_logic.agrona.MutableDirectBuffer;
9+
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
10+
11+
import java.nio.ByteBuffer;
12+
13+
class OperatorPublish implements Observable.Operator<Void, Frame> {
14+
15+
private static final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
16+
17+
private static final ThreadLocal<UnsafeBuffer> unsafeBuffers = ThreadLocal.withInitial(() -> new UnsafeBuffer(Constants.EMTPY));
18+
19+
private Publication publication;
20+
21+
public OperatorPublish(Publication publication) {
22+
this.publication = publication;
23+
}
24+
25+
@Override
26+
public rx.Subscriber<? super Frame> call(rx.Subscriber<? super Void> child) {
27+
return new rx.Subscriber<Frame>(child) {
28+
@Override
29+
public void onStart() {
30+
request(1);
31+
}
32+
33+
@Override
34+
public void onCompleted() {
35+
child.onCompleted();
36+
}
37+
38+
@Override
39+
public void onError(Throwable e) {
40+
child.onError(e);
41+
}
42+
43+
@Override
44+
public void onNext(Frame frame) {
45+
46+
final ByteBuffer byteBuffer = frame.getByteBuffer();
47+
final int length = byteBuffer.capacity() + BitUtil.SIZE_OF_INT;
48+
49+
// If the length is less the MTU size send the message using tryClaim which does not fragment the message
50+
// If the message is larger the the MTU size send it using offer.
51+
if (length < publication.maxMessageLength()) {
52+
tryClaim(byteBuffer, length);
53+
} else {
54+
offer(byteBuffer, length);
55+
}
56+
}
57+
58+
void offer(ByteBuffer byteBuffer, int length) {
59+
final byte[] bytes = new byte[length];
60+
final UnsafeBuffer unsafeBuffer = unsafeBuffers.get();
61+
unsafeBuffer.wrap(bytes);
62+
unsafeBuffer.putInt(0, MessageType.FRAME.getEncodedType());
63+
unsafeBuffer.putBytes(BitUtil.SIZE_OF_INT, byteBuffer, byteBuffer.capacity());
64+
for (;;) {
65+
final long offer = publication.offer(unsafeBuffer);
66+
if (offer >= 0) {
67+
break;
68+
} else if (Publication.NOT_CONNECTED == offer) {
69+
child.onError(new RuntimeException("not connected"));
70+
break;
71+
}
72+
}
73+
74+
}
75+
76+
void tryClaim(ByteBuffer byteBuffer, int length) {
77+
final BufferClaim bufferClaim = bufferClaims.get();
78+
for (;;) {
79+
final long offer = publication.tryClaim(length, bufferClaim);
80+
if (offer >= 0) {
81+
try {
82+
final MutableDirectBuffer buffer = bufferClaim.buffer();
83+
final int offset = bufferClaim.offset();
84+
buffer.putInt(offset, MessageType.FRAME.getEncodedType());
85+
buffer.putBytes(offset + BitUtil.SIZE_OF_INT, byteBuffer, 0, byteBuffer.capacity());
86+
} finally {
87+
bufferClaim.commit();
88+
}
89+
90+
break;
91+
} else if (Publication.NOT_CONNECTED == offer) {
92+
child.onError(new RuntimeException("not connected"));
93+
break;
94+
}
95+
}
96+
request(1);
97+
}
98+
};
99+
}
100+
}

src/main/java/io/reactivesocket/aeron/ReactiveSocketAeronServer.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import io.reactivesocket.Frame;
44
import io.reactivesocket.ReactiveSocket;
55
import io.reactivesocket.RequestHandler;
6+
import org.reactivestreams.Subscriber;
67
import rx.Scheduler;
78
import rx.schedulers.Schedulers;
8-
import rx.subjects.PublishSubject;
99
import uk.co.real_logic.aeron.Aeron;
1010
import uk.co.real_logic.aeron.FragmentAssembler;
1111
import uk.co.real_logic.aeron.Image;
@@ -16,7 +16,6 @@
1616
import uk.co.real_logic.agrona.DirectBuffer;
1717
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
1818

19-
import java.io.IOException;
2019
import java.nio.ByteBuffer;
2120
import java.util.concurrent.TimeUnit;
2221

@@ -105,15 +104,13 @@ void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header)
105104
MessageType type = MessageType.from(messageTypeInt);
106105

107106
if (MessageType.FRAME == type) {
108-
109107
AeronServerDuplexConnection connection = connections.get(sessionId);
110-
111108
if (connection != null) {
112-
final PublishSubject<Frame> subject = connection.getSubject();
109+
final Subscriber<? super Frame> subscriber = connection.getSubscriber();
113110
ByteBuffer bytes = ByteBuffer.allocate(length);
114111
buffer.getBytes(BitUtil.SIZE_OF_INT + offset, bytes, length);
115112
final Frame frame = Frame.from(bytes);
116-
subject.onNext(frame);
113+
subscriber.onNext(frame);
117114
}
118115
} else if (MessageType.ESTABLISH_CONNECTION_REQUEST == type) {
119116
final long start = System.nanoTime();
@@ -145,17 +142,22 @@ void newImageHandler(Image image, String channel, int streamId, int sessionId, l
145142
});
146143
System.out.println("Accepting ReactiveSocket connection");
147144
ReactiveSocket socket = ReactiveSocket.accept(connection, requestHandler);
145+
148146
socket.responderPublisher().subscribe(PROTOCOL_SUBSCRIBER);
149147
} else {
150148
System.out.println("Unsupported stream id " + streamId);
151149
}
152150
}
153151

154152
@Override
155-
public void close() throws IOException {
153+
public void close() throws Exception {
156154
running = false;
157155
worker.unsubscribe();
158156
aeron.close();
157+
158+
for (AeronServerDuplexConnection connection : connections.values()) {
159+
connection.close();
160+
}
159161
}
160162

161163
}

0 commit comments

Comments
 (0)