Skip to content

Commit d8f4bf9

Browse files
committed
initial commit
first commit
1 parent ef8c4bd commit d8f4bf9

File tree

6 files changed

+308
-28
lines changed

6 files changed

+308
-28
lines changed

build.gradle

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
11
buildscript {
2-
repositories {
3-
jcenter()
4-
}
2+
repositories {
3+
jcenter()
4+
}
55

6-
dependencies { classpath 'io.reactivesocket:gradle-nebula-plugin-reactivesocket:1.0.0' }
6+
dependencies { classpath 'io.reactivesocket:gradle-nebula-plugin-reactivesocket:1.0.0' }
77
}
88

99
description = 'ReactiveSocket: stream oriented messaging passing with Reactive Stream semantics.'
1010

1111
apply plugin: 'reactivesocket-project'
1212
apply plugin: 'java'
1313

14+
repositories {
15+
maven { url 'https://oss.jfrog.org/libs-snapshot' }
16+
}
17+
1418
dependencies {
15-
compile 'io.reactivex:rxjava:1.0.13'
16-
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
17-
compile 'org.reactivestreams:reactive-streams:1.0.0.final'
19+
compile 'io.reactivex:rxjava:1.0.13'
20+
compile 'io.reactivesocket:reactivesocket:0.0.1-SNAPSHOT'
1821
compile 'uk.co.real-logic:Agrona:0.4.2'
22+
compile 'uk.co.real-logic:aeron-all:0.1.2'
1923
testCompile 'junit:junit-dep:4.10'
2024
testCompile 'org.mockito:mockito-core:1.8.5'
2125
}
@@ -29,4 +33,4 @@ nebulaRelease {
2933

3034
if (project.hasProperty('release.useLastTag')) {
3135
tasks.prepare.enabled = false
32-
}
36+
}

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rootProject.name='reactivesocket'
1+
rootProject.name='reactivesocket-aeron-rxjava'
Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,75 @@
11
package io.reactivesocket.aeron;
22

3-
/**
4-
* Created by rroeser on 8/13/15.
5-
*/
6-
public class AeronServerDuplexConnection {
3+
import io.reactivesocket.DuplexConnection;
4+
import io.reactivesocket.Frame;
5+
import org.reactivestreams.Publisher;
6+
import rx.Observable;
7+
import rx.RxReactiveStreams;
8+
import rx.subjects.PublishSubject;
9+
import uk.co.real_logic.aeron.Publication;
10+
import uk.co.real_logic.aeron.logbuffer.BufferClaim;
11+
import uk.co.real_logic.agrona.MutableDirectBuffer;
12+
13+
import java.nio.ByteBuffer;
14+
15+
public class AeronServerDuplexConnection implements DuplexConnection {
16+
private static final byte[] EMTPY = new byte[0];
17+
18+
private static final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
19+
20+
private Publication publication;
21+
private PublishSubject<Frame> subject;
22+
23+
private int aeronStreamId;
24+
private int aeronSessionId;
25+
26+
public AeronServerDuplexConnection(
27+
Publication publication,
28+
int aeronStreamId,
29+
int aeronSessionId) {
30+
this.publication = publication;
31+
this.subject = PublishSubject.create();
32+
this.aeronStreamId = aeronStreamId;
33+
this.aeronSessionId = aeronSessionId;
34+
}
35+
36+
PublishSubject<Frame> getSubject() {
37+
return subject;
38+
}
39+
40+
@Override
41+
public Publisher<Frame> getInput() {
42+
return RxReactiveStreams.toPublisher(subject);
43+
}
44+
45+
public Publisher<Void> write(Publisher<Frame> o) {
46+
Observable<Void> req = RxReactiveStreams
47+
.toObservable(o)
48+
.map(frame -> {
49+
final ByteBuffer byteBuffer = frame.getByteBuffer();
50+
51+
for (;;) {
52+
final BufferClaim bufferClaim = bufferClaims.get();
53+
final long offer = publication.tryClaim(byteBuffer.capacity(), bufferClaim);
54+
if (offer >= 0) {
55+
try {
56+
final MutableDirectBuffer buffer = bufferClaim.buffer();
57+
final int offset = bufferClaim.offset();
58+
buffer.putBytes(offset, byteBuffer, 0, byteBuffer.capacity());
59+
} finally {
60+
bufferClaim.commit();
61+
}
62+
63+
break;
64+
} else if (Publication.NOT_CONNECTED == offer) {
65+
throw new RuntimeException("not connected");
66+
}
67+
68+
}
69+
70+
return null;
71+
});
72+
73+
return RxReactiveStreams.toPublisher(req);
74+
}
775
}
Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,114 @@
11
package io.reactivesocket.aeron;
22

3+
import io.reactivesocket.Frame;
34
import io.reactivesocket.ReactiveSocketServerProtocol;
45
import io.reactivesocket.RequestHandler;
6+
import rx.Scheduler;
7+
import rx.schedulers.Schedulers;
8+
import rx.subjects.PublishSubject;
59
import uk.co.real_logic.aeron.Aeron;
10+
import uk.co.real_logic.aeron.FragmentAssembler;
611
import uk.co.real_logic.aeron.Image;
12+
import uk.co.real_logic.aeron.Publication;
13+
import uk.co.real_logic.aeron.Subscription;
14+
import uk.co.real_logic.aeron.logbuffer.Header;
15+
import uk.co.real_logic.agrona.DirectBuffer;
16+
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
17+
18+
import java.io.Closeable;
19+
import java.io.IOException;
20+
import java.nio.ByteBuffer;
21+
22+
public class ReactiveSocketAeronServer implements Closeable {
723

8-
/**
9-
* Created by rroeser on 8/13/15.
10-
*/
11-
public class ReactiveSocketAeronServer {
1224
private final ReactiveSocketServerProtocol rsServerProtocol;
1325

1426
private final Aeron aeron;
1527

1628
private final int SERVER_STREAM_ID = 1;
1729

30+
private final int CLIENT_STREAM_ID = 2;
31+
1832
private final int port;
1933

20-
public ReactiveSocketAeronServer(int port, RequestHandler requestHandler) {
21-
this.port = port;
34+
private final Int2ObjectHashMap<AeronServerDuplexConnection> connections;
2235

23-
rsServerProtocol = ReactiveSocketServerProtocol.create(requestHandler);
36+
private final Scheduler.Worker worker;
37+
38+
private final Subscription subscription;
39+
40+
private volatile boolean running = true;
41+
42+
private ReactiveSocketAeronServer(int port, RequestHandler requestHandler) {
43+
this.port = port;
44+
this.connections = new Int2ObjectHashMap<>();
2445

2546
final Aeron.Context ctx = new Aeron.Context();
2647
ctx.newImageHandler(this::newImageHandler);
2748

2849
aeron = Aeron.connect(ctx);
2950

30-
aeron.addSubscription("udp://localhost:" + port, SERVER_STREAM_ID);
51+
subscription = aeron.addSubscription("udp://localhost:" + port, SERVER_STREAM_ID);
52+
53+
final FragmentAssembler fragmentAssembler = new FragmentAssembler(this::fragmentHandler);
54+
55+
worker = Schedulers.computation().createWorker();
56+
57+
poll(fragmentAssembler);
58+
59+
rsServerProtocol = ReactiveSocketServerProtocol.create(requestHandler);
60+
}
61+
62+
public static ReactiveSocketAeronServer create(int port, RequestHandler requestHandler) {
63+
return new ReactiveSocketAeronServer(port, requestHandler);
64+
}
65+
66+
public static ReactiveSocketAeronServer create(RequestHandler requestHandler) {
67+
return new ReactiveSocketAeronServer(39790, requestHandler);
68+
}
69+
70+
void poll(FragmentAssembler fragmentAssembler) {
71+
if (running) {
72+
worker.schedule(() -> {
73+
subscription.poll(fragmentAssembler, Integer.MAX_VALUE);
74+
poll(fragmentAssembler);
75+
});
76+
}
77+
}
78+
79+
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
80+
final int sessionId = header.sessionId();
81+
AeronServerDuplexConnection connection = connections.get(sessionId);
82+
83+
if (connection != null) {
84+
final PublishSubject<Frame> subject = connection.getSubject();
85+
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
86+
buffer.getBytes(0, bytes, buffer.capacity());
87+
final Frame frame = Frame.from(bytes);
88+
subject.onNext(frame);
89+
} else {
90+
System.out.println("No connection found for session id " + sessionId);
91+
}
3192
}
3293

3394
void newImageHandler(Image image, String channel, int streamId, int sessionId, long joiningPosition, String sourceIdentity) {
3495
if (SERVER_STREAM_ID == streamId) {
3596

97+
final AeronServerDuplexConnection connection = connections.computeIfAbsent(sessionId, (_s) -> {
98+
final String responseChannel = "udp://" + sourceIdentity.substring(0, sourceIdentity.indexOf(':')) + ":" + port;
99+
Publication publication = aeron.addPublication(responseChannel, CLIENT_STREAM_ID);
100+
return new AeronServerDuplexConnection(publication, streamId, sessionId);
101+
});
102+
rsServerProtocol.acceptConnection(connection);
36103
} else {
37-
System.out.println("");
104+
System.out.println("Unsupported stream id " + streamId);
38105
}
39106
}
107+
108+
@Override
109+
public void close() throws IOException {
110+
running = false;
111+
worker.unsubscribe();
112+
aeron.close();
113+
}
40114
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,147 @@
11
package io.reactivesocket.aeron;
22

3+
import io.reactivesocket.DuplexConnection;
4+
import io.reactivesocket.Frame;
5+
import io.reactivesocket.ReactiveSocketClientProtocol;
6+
import org.reactivestreams.Publisher;
7+
import rx.Observable;
8+
import rx.RxReactiveStreams;
9+
import rx.Scheduler;
10+
import rx.schedulers.Schedulers;
11+
import rx.subjects.PublishSubject;
12+
import uk.co.real_logic.aeron.Aeron;
13+
import uk.co.real_logic.aeron.FragmentAssembler;
14+
import uk.co.real_logic.aeron.Publication;
15+
import uk.co.real_logic.aeron.Subscription;
16+
import uk.co.real_logic.aeron.logbuffer.Header;
17+
import uk.co.real_logic.agrona.DirectBuffer;
18+
import uk.co.real_logic.agrona.collections.Int2ObjectHashMap;
19+
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
20+
21+
import java.nio.ByteBuffer;
22+
323
/**
424
* Created by rroeser on 8/13/15.
525
*/
626
public class ReactivesocketAeronClient {
27+
private static final byte[] EMTPY = new byte[0];
28+
29+
private static final ThreadLocal<UnsafeBuffer> buffers = ThreadLocal.withInitial(() -> new UnsafeBuffer(EMTPY));
30+
31+
private static final Int2ObjectHashMap<Subscription> subscriptions = new Int2ObjectHashMap<>();
32+
33+
private static final Int2ObjectHashMap<PublishSubject<Frame>> subjects = new Int2ObjectHashMap<>();
34+
35+
private static final int SERVER_STREAM_ID = 1;
36+
37+
private static final int CLIENT_STREAM_ID = 2;
38+
39+
private final ReactiveSocketClientProtocol rsClientProtocol;
40+
41+
private final Aeron aeron;
42+
43+
private volatile boolean running = true;
44+
45+
private final int port;
46+
47+
private ReactivesocketAeronClient(String host, int port) {
48+
this.port = port;
49+
50+
final Aeron.Context ctx = new Aeron.Context();
51+
aeron = Aeron.connect(ctx);
52+
53+
final String channel = "udp://" + host + ":" + port;
54+
55+
final Publication publication = aeron.addPublication(channel, SERVER_STREAM_ID);
56+
57+
final int sessionId = publication.sessionId();
58+
59+
subjects.computeIfAbsent(sessionId, (_p) -> PublishSubject.create());
60+
61+
subscriptions.computeIfAbsent(port, (_p) -> {
62+
Subscription subscription = aeron.addSubscription(channel, CLIENT_STREAM_ID);
63+
64+
final FragmentAssembler fragmentAssembler = new FragmentAssembler(this::fragmentHandler);
65+
66+
poll(fragmentAssembler, subscription, Schedulers.computation().createWorker());
67+
68+
return subscription;
69+
});
70+
71+
this.rsClientProtocol =
72+
ReactiveSocketClientProtocol.create(new DuplexConnection() {
73+
74+
public Publisher<Frame> getInput() {
75+
PublishSubject publishSubject = subjects.get(port);
76+
return RxReactiveStreams.toPublisher(publishSubject);
77+
}
78+
79+
@Override
80+
public Publisher<Void> write(Publisher<Frame> o) {
81+
Observable<Void> req = RxReactiveStreams
82+
.toObservable(o)
83+
.map(frame -> {
84+
final UnsafeBuffer buffer = buffers.get();
85+
ByteBuffer byteBuffer = frame.getByteBuffer();
86+
buffer.wrap(byteBuffer);
87+
88+
for (;;) {
89+
final long offer = publication.offer(buffer);
90+
91+
if (offer >= 0) {
92+
break;
93+
} else if (Publication.NOT_CONNECTED == offer) {
94+
throw new RuntimeException("not connected");
95+
}
96+
}
97+
98+
return null;
99+
});
100+
101+
return RxReactiveStreams.toPublisher(req);
102+
}
103+
});
104+
}
105+
106+
public static ReactivesocketAeronClient create(String host, int port) {
107+
return new ReactivesocketAeronClient(host, port);
108+
}
109+
110+
public static ReactivesocketAeronClient create(String host) {
111+
return new ReactivesocketAeronClient(host, 39790);
112+
}
113+
114+
void fragmentHandler(DirectBuffer buffer, int offset, int length, Header header) {
115+
final PublishSubject<Frame> subject = subjects.get(header.sessionId());
116+
ByteBuffer bytes = ByteBuffer.allocate(buffer.capacity());
117+
buffer.getBytes(0, bytes, buffer.capacity());
118+
final Frame frame = Frame.from(bytes);
119+
subject.onNext(frame);
120+
}
121+
122+
void poll(FragmentAssembler fragmentAssembler, Subscription subscription, Scheduler.Worker worker) {
123+
if (running) {
124+
worker.schedule(() -> {
125+
subscription.poll(fragmentAssembler, Integer.MAX_VALUE);
126+
poll(fragmentAssembler, subscription, worker);
127+
});
128+
}
129+
}
130+
131+
public Publisher<String> requestResponse(String payload) {
132+
return rsClientProtocol.requestResponse(payload);
133+
}
134+
135+
public Publisher<String> requestStream(String payload) {
136+
return rsClientProtocol.requestStream(payload);
137+
}
138+
139+
public Publisher<Void> fireAndForget(String payload) {
140+
return rsClientProtocol.fireAndForget(payload);
141+
}
142+
143+
public Publisher<String> requestSubscription(String payload) {
144+
return rsClientProtocol.requestSubscription(payload);
145+
}
146+
7147
}

0 commit comments

Comments
 (0)