Skip to content

Commit f90fbaa

Browse files
Merge pull request #50 from ReactiveSocket/bidirectional
Bi-Directional Requester/Responder
2 parents 5b15625 + 9439ca2 commit f90fbaa

File tree

9 files changed

+510
-311
lines changed

9 files changed

+510
-311
lines changed

src/main/java/io/reactivesocket/ReactiveSocket.java

Lines changed: 228 additions & 37 deletions
Large diffs are not rendered by default.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.reactivesocket.internal;
2+
3+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
4+
5+
import io.reactivesocket.observable.Disposable;
6+
7+
public final class BooleanDisposable implements Disposable {
8+
volatile Runnable run;
9+
10+
static final AtomicReferenceFieldUpdater<BooleanDisposable, Runnable> RUN =
11+
AtomicReferenceFieldUpdater.newUpdater(BooleanDisposable.class, Runnable.class, "run");
12+
13+
static final Runnable DISPOSED = () -> { };
14+
15+
public BooleanDisposable() {
16+
this(() -> { });
17+
}
18+
19+
public BooleanDisposable(Runnable run) {
20+
RUN.lazySet(this, run);
21+
}
22+
23+
@Override
24+
public void dispose() {
25+
Runnable r = run;
26+
if (r != DISPOSED) {
27+
r = RUN.getAndSet(this, DISPOSED);
28+
if (r != DISPOSED) {
29+
r.run();
30+
}
31+
}
32+
}
33+
34+
public boolean isDisposed() {
35+
return run == DISPOSED;
36+
}
37+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.reactivesocket.internal;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
6+
import io.reactivesocket.Completable;
7+
8+
/**
9+
* A Completable container that can hold onto multiple other Completables.
10+
*/
11+
public final class CompositeCompletable implements Completable {
12+
13+
// protected by synchronized
14+
private boolean completed = false;
15+
private Throwable error = null;
16+
final Set<Completable> resources = new HashSet<>();
17+
18+
public CompositeCompletable() {
19+
20+
}
21+
22+
public void add(Completable d) {
23+
boolean terminal = false;
24+
synchronized (this) {
25+
if (error != null || completed) {
26+
terminal = true;
27+
} else {
28+
resources.add(d);
29+
}
30+
}
31+
if (terminal) {
32+
if (error != null) {
33+
d.error(error);
34+
} else {
35+
d.success();
36+
}
37+
}
38+
}
39+
40+
public void remove(Completable d) {
41+
synchronized (this) {
42+
resources.remove(d);
43+
}
44+
}
45+
46+
public void clear() {
47+
synchronized (this) {
48+
resources.clear();
49+
}
50+
}
51+
52+
@Override
53+
public void success() {
54+
Completable[] cs = null;
55+
synchronized (this) {
56+
if (error == null) {
57+
completed = true;
58+
cs = resources.toArray(new Completable[] {});
59+
resources.clear();
60+
}
61+
}
62+
if (cs != null) {
63+
for (Completable c : cs) {
64+
c.success();
65+
}
66+
}
67+
}
68+
69+
@Override
70+
public void error(Throwable e) {
71+
Completable[] cs = null;
72+
synchronized (this) {
73+
if (error == null && !completed) {
74+
error = e;
75+
cs = resources.toArray(new Completable[] {});
76+
resources.clear();
77+
}
78+
}
79+
if (cs != null) {
80+
for (Completable c : cs) {
81+
c.error(e);
82+
}
83+
}
84+
}
85+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.reactivesocket.internal;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
6+
import io.reactivesocket.Completable;
7+
import io.reactivesocket.observable.Disposable;
8+
9+
/**
10+
* A Disposable container that can hold onto multiple other Disposables.
11+
*/
12+
public final class CompositeDisposable implements Disposable {
13+
14+
// protected by synchronized
15+
private boolean disposed = false;
16+
final Set<Disposable> resources = new HashSet<>();
17+
18+
public CompositeDisposable() {
19+
20+
}
21+
22+
public void add(Disposable d) {
23+
boolean isDisposed = false;
24+
synchronized (this) {
25+
if (disposed) {
26+
isDisposed = true;
27+
} else {
28+
resources.add(d);
29+
}
30+
}
31+
if (isDisposed) {
32+
d.dispose();
33+
}
34+
}
35+
36+
public void remove(Completable d) {
37+
synchronized (this) {
38+
resources.remove(d);
39+
}
40+
}
41+
42+
public void clear() {
43+
synchronized (this) {
44+
resources.clear();
45+
}
46+
}
47+
48+
@Override
49+
public void dispose() {
50+
Disposable[] cs = null;
51+
synchronized (this) {
52+
disposed = true;
53+
cs = resources.toArray(new Disposable[] {});
54+
resources.clear();
55+
}
56+
for (Disposable d : cs) {
57+
d.dispose();
58+
}
59+
}
60+
61+
}

src/main/java/io/reactivesocket/internal/Requester.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -753,38 +753,42 @@ private void start(Completable onComplete) {
753753
connection.getInput().subscribe(new Observer<Frame>() {
754754
public void onSubscribe(Disposable d) {
755755
if (connectionSubscription.compareAndSet(null, d)) {
756-
// now that we are connected, send SETUP frame (asynchronously, other messages can continue being written after this)
757-
connection.addOutput(PublisherUtils.just(Frame.Setup.from(setupPayload.getFlags(), KEEPALIVE_INTERVAL_MS, 0, setupPayload.metadataMimeType(), setupPayload.dataMimeType(), setupPayload)),
758-
new Completable() {
759-
760-
@Override
761-
public void success() {
762-
onComplete.success();
763-
requesterStarted = true;
764-
}
765-
766-
@Override
767-
public void error(Throwable e) {
768-
onComplete.error(e);
769-
tearDown(e);
770-
}
771-
772-
});
773-
774-
connection.addOutput(PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS),
775-
new Completable()
776-
{
777-
public void success()
778-
{
779-
}
780-
781-
public void error(Throwable e)
756+
if(isServer) {
757+
requesterStarted = true;
758+
onComplete.success();
759+
} else {
760+
// now that we are connected, send SETUP frame (asynchronously, other messages can continue being written after this)
761+
connection.addOutput(PublisherUtils.just(Frame.Setup.from(setupPayload.getFlags(), KEEPALIVE_INTERVAL_MS, 0, setupPayload.metadataMimeType(), setupPayload.dataMimeType(), setupPayload)),
762+
new Completable() {
763+
764+
@Override
765+
public void success() {
766+
requesterStarted = true;
767+
onComplete.success();
768+
}
769+
770+
@Override
771+
public void error(Throwable e) {
772+
onComplete.error(e);
773+
tearDown(e);
774+
}
775+
776+
});
777+
778+
connection.addOutput(PublisherUtils.keepaliveTicker(KEEPALIVE_INTERVAL_MS, TimeUnit.MILLISECONDS),
779+
new Completable()
782780
{
783-
onComplete.error(e);
784-
tearDown(e);
785-
}
786-
});
787-
781+
public void success()
782+
{
783+
}
784+
785+
public void error(Throwable e)
786+
{
787+
onComplete.error(e);
788+
tearDown(e);
789+
}
790+
});
791+
}
788792
} else {
789793
// means we already were cancelled
790794
d.dispose();

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,23 @@
4040
*/
4141
public class Responder {
4242
private final DuplexConnection connection;
43-
private final ConnectionSetupHandler connectionHandler;
43+
private final ConnectionSetupHandler connectionHandler; // for server
44+
private final RequestHandler clientRequestHandler; // for client
4445
private final Consumer<Throwable> errorStream;
4546
private volatile LeaseGovernor leaseGovernor;
4647
private long timeOfLastKeepalive;
48+
private final Consumer<ConnectionSetupPayload> setupCallback;
49+
private final boolean isServer;
4750

48-
private Responder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream) {
51+
private Responder(boolean isServer, DuplexConnection connection, ConnectionSetupHandler connectionHandler, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Consumer<ConnectionSetupPayload> setupCallback) {
52+
this.isServer = isServer;
4953
this.connection = connection;
5054
this.connectionHandler = connectionHandler;
55+
this.clientRequestHandler = requestHandler;
5156
this.leaseGovernor = leaseGovernor;
5257
this.errorStream = errorStream;
5358
this.timeOfLastKeepalive = System.nanoTime();
59+
this.setupCallback = setupCallback;
5460
}
5561

5662
/**
@@ -62,8 +68,18 @@ private Responder(DuplexConnection connection, ConnectionSetupHandler connection
6268
* This include fireAndForget which ONLY emit errors server-side via this mechanism.
6369
* @return responder instance
6470
*/
65-
public static <T> Responder create(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
66-
Responder responder = new Responder(connection, connectionHandler, leaseGovernor, errorStream);
71+
public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable, Consumer<ConnectionSetupPayload> setupCallback) {
72+
Responder responder = new Responder(true, connection, connectionHandler, null, leaseGovernor, errorStream, setupCallback);
73+
responder.start(responderCompletable);
74+
return responder;
75+
}
76+
77+
public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
78+
return createServerResponder(connection, connectionHandler, leaseGovernor, errorStream, responderCompletable, s -> {});
79+
}
80+
81+
public static <T> Responder createClientResponder(DuplexConnection connection, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
82+
Responder responder = new Responder(false, connection, null, requestHandler, leaseGovernor, errorStream, s -> {});
6783
responder.start(responderCompletable);
6884
return responder;
6985
}
@@ -123,12 +139,12 @@ public void onSubscribe(Disposable d) {
123139
}
124140
}
125141

126-
volatile RequestHandler requestHandler = null; // null until after first Setup frame
127-
142+
volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null; // null until after first Setup frame
143+
128144
@Override
129145
public void onNext(Frame requestFrame) {
130146
final int streamId = requestFrame.getStreamId();
131-
if (requestHandler == null) {
147+
if (requestHandler == null) { // this will only happen when isServer==true
132148
if (childTerminated.get()) {
133149
// already terminated, but still receiving latent messages ... ignore them while shutdown occurs
134150
return;
@@ -141,6 +157,9 @@ public void onNext(Frame requestFrame) {
141157
throw new SetupException("unsupported protocol version: " + Frame.Setup.version(requestFrame));
142158
}
143159

160+
// accept setup for ReactiveSocket/Requester usage
161+
setupCallback.accept(connectionSetupPayload);
162+
// handle setup
144163
requestHandler = connectionHandler.apply(connectionSetupPayload);
145164
} catch (SetupException setupException) {
146165
setupErrorAndTearDown(connection, setupException);

0 commit comments

Comments
 (0)