Skip to content

Commit 3cd3549

Browse files
committed
Merge pull request #1 from robertroeser/master
updated to use use reactivesocket 0.0.5, and added ability to schedul…
2 parents 60b90a5 + cf65dd6 commit 3cd3549

File tree

33 files changed

+310
-125
lines changed

33 files changed

+310
-125
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ subprojects {
1616
dependencies {
1717
compile 'io.reactivex:rxjava:1.1.2'
1818
compile 'io.reactivex:rxjava-reactive-streams:1.0.1'
19-
compile 'io.reactivesocket:reactivesocket:0.0.4'
19+
compile 'io.reactivesocket:reactivesocket:0.0.5'
2020
compile 'org.hdrhistogram:HdrHistogram:2.1.7'
2121
compile 'org.slf4j:slf4j-api:1.7.12'
2222
testCompile 'junit:junit:4.12'

reactivesocket-aeron/build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
dependencies {
2-
compile 'uk.co.real-logic:Agrona:0.4.8'
3-
compile 'uk.co.real-logic:aeron-all:0.2.2'
2+
compile 'io.aeron:aeron-all:0.9.5'
43
}

reactivesocket-aeron/src/examples/java/io/reactivesocket/aeron/example/MediaDriver.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
*/
1616
package io.reactivesocket.aeron.example;
1717

18-
import uk.co.real_logic.aeron.driver.ThreadingMode;
19-
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;
20-
import uk.co.real_logic.agrona.concurrent.NoOpIdleStrategy;
18+
import io.aeron.driver.ThreadingMode;
19+
import org.agrona.concurrent.BackoffIdleStrategy;
2120

2221
public class MediaDriver {
2322
public static void main(String... args) {
@@ -31,14 +30,14 @@ public static void main(String... args) {
3130

3231
System.out.println("ThreadingMode => " + threadingMode);
3332

34-
final uk.co.real_logic.aeron.driver.MediaDriver.Context ctx = new uk.co.real_logic.aeron.driver.MediaDriver.Context()
33+
final io.aeron.driver.MediaDriver.Context ctx = new io.aeron.driver.MediaDriver.Context()
3534
.threadingMode(threadingMode)
3635
.dirsDeleteOnStart(true)
3736
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 100, 1000))
38-
.receiverIdleStrategy(new NoOpIdleStrategy())
39-
.senderIdleStrategy(new NoOpIdleStrategy());
37+
.receiverIdleStrategy(new BackoffIdleStrategy(1, 1, 100, 1000))
38+
.senderIdleStrategy(new BackoffIdleStrategy(1, 1, 100, 1000));
4039

41-
final uk.co.real_logic.aeron.driver.MediaDriver ignored = uk.co.real_logic.aeron.driver.MediaDriver.launch(ctx);
40+
final io.aeron.driver.MediaDriver ignored = io.aeron.driver.MediaDriver.launch(ctx);
4241

4342
}
4443
}

reactivesocket-aeron/src/examples/java/io/reactivesocket/aeron/example/fireandforget/Fire.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818

1919
import io.reactivesocket.ConnectionSetupPayload;
20+
import io.reactivesocket.DefaultReactiveSocket;
2021
import io.reactivesocket.Payload;
2122
import io.reactivesocket.ReactiveSocket;
2223
import io.reactivesocket.aeron.client.AeronClientDuplexConnection;
@@ -61,7 +62,7 @@ public static void main(String... args) throws Exception {
6162
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
6263
System.out.println("Created duplex connection");
6364

64-
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
65+
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
6566
reactiveSocket.startAndWait();
6667

6768
CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE);

reactivesocket-aeron/src/examples/java/io/reactivesocket/aeron/example/requestreply/Ping.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.reactivesocket.aeron.example.requestreply;
1717

1818
import io.reactivesocket.ConnectionSetupPayload;
19+
import io.reactivesocket.DefaultReactiveSocket;
1920
import io.reactivesocket.Payload;
2021
import io.reactivesocket.ReactiveSocket;
2122
import io.reactivesocket.aeron.client.AeronClientDuplexConnection;
@@ -63,7 +64,7 @@ public static void main(String... args) throws Exception {
6364
AeronClientDuplexConnection connection = RxReactiveStreams.toObservable(udpConnection).toBlocking().single();
6465
System.out.println("Created duplex connection");
6566

66-
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
67+
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS));
6768
reactiveSocket.startAndWait();
6869

6970
CountDownLatch latch = new CountDownLatch(Integer.MAX_VALUE);
@@ -83,7 +84,7 @@ public static void main(String... args) throws Exception {
8384
System.out.println("---- PING/ PONG HISTO ----");
8485

8586

86-
}, 10, 10, TimeUnit.SECONDS);
87+
}, 1, 1, TimeUnit.SECONDS);
8788

8889
Observable
8990
.range(1, Integer.MAX_VALUE)
@@ -112,7 +113,7 @@ public ByteBuffer getMetadata() {
112113
long diff = System.nanoTime() - start;
113114
histogram.recordValue(diff);
114115
});
115-
})
116+
}, 16)
116117
.subscribe(new Subscriber<Payload>() {
117118
@Override
118119
public void onCompleted() {

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/AeronClientDuplexConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
*/
1616
package io.reactivesocket.aeron.client;
1717

18+
import io.aeron.Publication;
1819
import io.reactivesocket.DuplexConnection;
1920
import io.reactivesocket.Frame;
2021
import io.reactivesocket.aeron.internal.Loggable;
2122
import io.reactivesocket.rx.Completable;
2223
import io.reactivesocket.rx.Disposable;
2324
import io.reactivesocket.rx.Observable;
2425
import io.reactivesocket.rx.Observer;
26+
import org.agrona.concurrent.AbstractConcurrentArrayQueue;
2527
import org.reactivestreams.Publisher;
2628
import org.reactivestreams.Subscriber;
2729
import org.reactivestreams.Subscription;
28-
import uk.co.real_logic.aeron.Publication;
29-
import uk.co.real_logic.agrona.concurrent.AbstractConcurrentArrayQueue;
3030

3131
import java.io.IOException;
3232
import java.util.concurrent.CopyOnWriteArrayList;

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/AeronClientDuplexConnectionFactory.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import io.reactivesocket.rx.Observer;
2424
import org.reactivestreams.Publisher;
2525
import org.reactivestreams.Subscriber;
26-
import uk.co.real_logic.aeron.Publication;
27-
import uk.co.real_logic.aeron.logbuffer.FragmentHandler;
28-
import uk.co.real_logic.aeron.logbuffer.Header;
29-
import uk.co.real_logic.agrona.BitUtil;
30-
import uk.co.real_logic.agrona.DirectBuffer;
31-
import uk.co.real_logic.agrona.concurrent.ManyToManyConcurrentArrayQueue;
32-
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
26+
import io.aeron.Publication;
27+
import io.aeron.logbuffer.FragmentHandler;
28+
import io.aeron.logbuffer.Header;
29+
import org.agrona.BitUtil;
30+
import org.agrona.DirectBuffer;
31+
import org.agrona.concurrent.ManyToManyConcurrentArrayQueue;
32+
import org.agrona.concurrent.UnsafeBuffer;
3333

3434
import java.net.InetSocketAddress;
3535
import java.net.SocketAddress;

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/AeronReactiveSocketFactory.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
11
package io.reactivesocket.aeron.client;
22

33
import io.reactivesocket.ConnectionSetupPayload;
4+
import io.reactivesocket.DefaultReactiveSocket;
45
import io.reactivesocket.ReactiveSocket;
56
import io.reactivesocket.ReactiveSocketFactory;
7+
import io.reactivesocket.ReactiveSocketSocketAddressFactory;
68
import io.reactivesocket.rx.Completable;
9+
import org.agrona.LangUtil;
710
import org.reactivestreams.Publisher;
811
import org.reactivestreams.Subscriber;
912
import org.reactivestreams.Subscription;
1013
import org.slf4j.Logger;
1114
import org.slf4j.LoggerFactory;
1215
import rx.Observable;
1316
import rx.RxReactiveStreams;
14-
import uk.co.real_logic.agrona.LangUtil;
1517

16-
import java.net.*;
18+
import java.net.Inet4Address;
19+
import java.net.InetAddress;
20+
import java.net.InetSocketAddress;
21+
import java.net.NetworkInterface;
22+
import java.net.SocketAddress;
1723
import java.util.Enumeration;
18-
import java.util.concurrent.TimeUnit;
1924
import java.util.function.Consumer;
2025

2126
/**
2227
* An implementation of {@link ReactiveSocketFactory} that creates Aeron ReactiveSockets.
2328
*/
24-
public class AeronReactiveSocketFactory implements ReactiveSocketFactory {
29+
public class AeronReactiveSocketFactory implements ReactiveSocketSocketAddressFactory<ReactiveSocket> {
2530
private static final Logger logger = LoggerFactory.getLogger(AeronReactiveSocketFactory.class);
2631

2732
private final ConnectionSetupPayload connectionSetupPayload;
@@ -46,7 +51,7 @@ public AeronReactiveSocketFactory(String host, int port, ConnectionSetupPayload
4651
}
4752

4853
@Override
49-
public Publisher<ReactiveSocket> call(SocketAddress address, long timeout, TimeUnit timeUnit) {
54+
public Publisher<ReactiveSocket> call(SocketAddress address) {
5055
Publisher<AeronClientDuplexConnection> connection
5156
= AeronClientDuplexConnectionFactory.getInstance().createAeronClientDuplexConnection(address);
5257

@@ -59,7 +64,7 @@ public void onSubscribe(Subscription s) {
5964

6065
@Override
6166
public void onNext(AeronClientDuplexConnection connection) {
62-
ReactiveSocket reactiveSocket = ReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
67+
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream);
6368
reactiveSocket.start(new Completable() {
6469
@Override
6570
public void success() {
@@ -85,7 +90,7 @@ public void onComplete() {
8590
})
8691
);
8792

88-
return RxReactiveStreams.toPublisher(result.timeout(timeout, timeUnit));
93+
return RxReactiveStreams.toPublisher(result);
8994
}
9095

9196
private static InetAddress getIPv4InetAddress() {

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/ClientAeronManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import io.reactivesocket.aeron.internal.Loggable;
1919
import rx.Scheduler;
2020
import rx.schedulers.Schedulers;
21-
import uk.co.real_logic.aeron.Aeron;
22-
import uk.co.real_logic.aeron.FragmentAssembler;
23-
import uk.co.real_logic.aeron.Image;
24-
import uk.co.real_logic.aeron.Subscription;
25-
import uk.co.real_logic.aeron.logbuffer.FragmentHandler;
21+
import io.aeron.Aeron;
22+
import io.aeron.FragmentAssembler;
23+
import io.aeron.Image;
24+
import io.aeron.Subscription;
25+
import io.aeron.logbuffer.FragmentHandler;
2626

2727
import java.util.concurrent.CopyOnWriteArrayList;
2828
import java.util.concurrent.TimeUnit;
@@ -47,8 +47,8 @@ private ClientAeronManager() {
4747

4848
final Aeron.Context ctx = new Aeron.Context();
4949
ctx.errorHandler(t -> error("an exception occurred", t));
50-
ctx.availableImageHandler((Image image, Subscription subscription, long joiningPosition, String sourceIdentity) ->
51-
debug("New image available with session id => {} and sourceIdentity => {} and subscription => {}", image.sessionId(), sourceIdentity, subscription.toString())
50+
ctx.availableImageHandler((Image image) ->
51+
debug("New image available with session id => {} and sourceIdentity => {} and subscription => {}", image.sessionId(), image.sourceIdentity(), image.subscription().toString())
5252
);
5353

5454
aeron = Aeron.connect(ctx);
@@ -117,7 +117,7 @@ void poll() {
117117
*/
118118

119119
/**
120-
* Creates a logic group of {@link uk.co.real_logic.aeron.Subscription}s to a particular channel.
120+
* Creates a logic group of {@link io.aeron.Subscription}s to a particular channel.
121121
*/
122122
public static class SubscriptionGroup {
123123

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/FrameHolder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515
*/
1616
package io.reactivesocket.aeron.client;
1717

18+
import io.aeron.Publication;
1819
import io.reactivesocket.Frame;
1920
import org.HdrHistogram.Recorder;
21+
import org.agrona.concurrent.OneToOneConcurrentArrayQueue;
2022
import org.reactivestreams.Subscription;
21-
import uk.co.real_logic.aeron.Publication;
22-
import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;
23-
2423
/**
2524
* Holds a frame and the publication that it's supposed to be sent on.
26-
* Pools instances on an {@link uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue}
25+
* Pools instances on an {@link OneToOneConcurrentArrayQueue}
2726
*/
2827
public class FrameHolder {
2928
private static final ThreadLocal<OneToOneConcurrentArrayQueue<FrameHolder>> FRAME_HOLDER_QUEUE

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/client/PollingAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
*/
1616
package io.reactivesocket.aeron.client;
1717

18+
import io.aeron.Subscription;
1819
import io.reactivesocket.aeron.internal.Loggable;
1920
import rx.functions.Action0;
20-
import uk.co.real_logic.aeron.Subscription;
2121

2222
import java.util.List;
2323

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/internal/AeronUtil.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
*/
1616
package io.reactivesocket.aeron.internal;
1717

18-
import uk.co.real_logic.aeron.Publication;
19-
import uk.co.real_logic.aeron.logbuffer.BufferClaim;
20-
import uk.co.real_logic.agrona.MutableDirectBuffer;
21-
import uk.co.real_logic.agrona.concurrent.OneToOneConcurrentArrayQueue;
22-
import uk.co.real_logic.agrona.concurrent.UnsafeBuffer;
18+
import io.aeron.Publication;
19+
import io.aeron.logbuffer.BufferClaim;
20+
import org.agrona.MutableDirectBuffer;
21+
import org.agrona.concurrent.OneToOneConcurrentArrayQueue;
22+
import org.agrona.concurrent.UnsafeBuffer;
2323

2424
import java.util.concurrent.TimeUnit;
2525

@@ -41,7 +41,7 @@ public class AeronUtil implements Loggable {
4141
* This method of sending data does need to know how long the message is.
4242
*
4343
* @param publication publication to send the message on
44-
* @param fillBuffer closure passed in to fill a {@link uk.co.real_logic.agrona.MutableDirectBuffer}
44+
* @param fillBuffer closure passed in to fill a {@link MutableDirectBuffer}
4545
* that is send over Aeron
4646
*/
4747
public static void offer(Publication publication, BufferFiller fillBuffer, int length, int timeout, TimeUnit timeUnit) {
@@ -76,7 +76,7 @@ public static void offer(Publication publication, BufferFiller fillBuffer, int l
7676
* In order to use this method of sending data you need to know the length of data.
7777
*
7878
* @param publication publication to send the message on
79-
* @param fillBuffer closure passed in to fill a {@link uk.co.real_logic.agrona.MutableDirectBuffer}
79+
* @param fillBuffer closure passed in to fill a {@link MutableDirectBuffer}
8080
* that is send over Aeron
8181
* @param length the length of data
8282
*/
@@ -114,7 +114,7 @@ public static void tryClaim(Publication publication, BufferFiller fillBuffer, in
114114
* size it will use offer instead.
115115
*
116116
* @param publication publication to send the message on
117-
* @param fillBuffer closure passed in to fill a {@link uk.co.real_logic.agrona.MutableDirectBuffer}
117+
* @param fillBuffer closure passed in to fill a {@link MutableDirectBuffer}
118118
* that is send over Aeron
119119
* @param length the length of data
120120
*/

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/internal/Constants.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
*/
1616
package io.reactivesocket.aeron.internal;
1717

18-
import uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy;
19-
import uk.co.real_logic.agrona.concurrent.IdleStrategy;
20-
import uk.co.real_logic.agrona.concurrent.NoOpIdleStrategy;
21-
import uk.co.real_logic.agrona.concurrent.SleepingIdleStrategy;
18+
19+
import org.agrona.concurrent.BackoffIdleStrategy;
20+
import org.agrona.concurrent.IdleStrategy;
21+
import org.agrona.concurrent.NoOpIdleStrategy;
22+
import org.agrona.concurrent.SleepingIdleStrategy;
2223

2324
import java.util.concurrent.TimeUnit;
2425

reactivesocket-aeron/src/main/java/io/reactivesocket/aeron/server/AeronServerDuplexConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,20 @@
1515
*/
1616
package io.reactivesocket.aeron.server;
1717

18+
import io.aeron.Publication;
1819
import io.reactivesocket.DuplexConnection;
1920
import io.reactivesocket.Frame;
20-
import io.reactivesocket.aeron.internal.*;
21+
import io.reactivesocket.aeron.internal.AeronUtil;
22+
import io.reactivesocket.aeron.internal.Constants;
23+
import io.reactivesocket.aeron.internal.Loggable;
24+
import io.reactivesocket.aeron.internal.MessageType;
25+
import io.reactivesocket.aeron.internal.NotConnectedException;
2126
import io.reactivesocket.rx.Completable;
2227
import io.reactivesocket.rx.Disposable;
2328
import io.reactivesocket.rx.Observable;
2429
import io.reactivesocket.rx.Observer;
30+
import org.agrona.BitUtil;
2531
import org.reactivestreams.Publisher;
26-
import uk.co.real_logic.aeron.Publication;
27-
import uk.co.real_logic.agrona.BitUtil;
2832

2933
import java.util.List;
3034
import java.util.concurrent.CopyOnWriteArrayList;

0 commit comments

Comments
 (0)