Skip to content

Commit f419554

Browse files
Merge pull request #216 from NiteshKant/0.5.x-duplex
Duplex interactions
2 parents 272401b + 261ce41 commit f419554

File tree

5 files changed

+135
-15
lines changed

5 files changed

+135
-15
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/ClientReactiveSocket.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivesocket.lease.LeaseImpl;
2727
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
2828
import io.reactivesocket.reactivestreams.extensions.Px;
29+
import io.reactivesocket.reactivestreams.extensions.internal.FlowControlHelper;
2930
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
3031
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
3132
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
@@ -54,7 +55,7 @@ public class ClientReactiveSocket implements ReactiveSocket {
5455
private final Int2ObjectHashMap<Subscription> senders;
5556
private final Int2ObjectHashMap<Subscriber<Frame>> receivers;
5657

57-
private volatile Subscription transportReceiveSubscription;
58+
private final BufferingSubscription transportReceiveSubscription = new BufferingSubscription();
5859
private CancellableSubscriber<Void> keepAliveSendSub;
5960
private volatile Consumer<Lease> leaseConsumer; // Provided on start()
6061

@@ -190,7 +191,7 @@ private void startKeepAlive() {
190191
private void startReceivingRequests() {
191192
Px
192193
.from(connection.receive())
193-
.doOnSubscribe(subscription -> transportReceiveSubscription = subscription)
194+
.doOnSubscribe(subscription -> transportReceiveSubscription.switchTo(subscription))
194195
.doOnNext(this::handleIncomingFrames)
195196
.subscribe();
196197
}
@@ -200,9 +201,7 @@ protected void cleanup() {
200201
if (null != keepAliveSendSub) {
201202
keepAliveSendSub.cancel();
202203
}
203-
if (null != transportReceiveSubscription) {
204-
transportReceiveSubscription.cancel();
205-
}
204+
transportReceiveSubscription.cancel();
206205
}
207206

208207
private void handleIncomingFrames(Frame frame) {
@@ -352,4 +351,45 @@ private synchronized void registerSenderReceiver(int streamId, Subscription send
352351
senders.put(streamId, sender);
353352
receivers.put(streamId, receiver);
354353
}
354+
355+
private static class BufferingSubscription implements Subscription {
356+
357+
private int requested;
358+
private boolean cancelled;
359+
private Subscription delegate;
360+
361+
@Override
362+
public void request(long n) {
363+
if (relay()) {
364+
delegate.request(n);
365+
} else {
366+
requested = FlowControlHelper.incrementRequestN(requested, n);
367+
}
368+
}
369+
370+
@Override
371+
public void cancel() {
372+
if (relay()) {
373+
delegate.cancel();
374+
} else {
375+
cancelled = true;
376+
}
377+
}
378+
379+
private void switchTo(Subscription subscription) {
380+
synchronized (this) {
381+
delegate = subscription;
382+
}
383+
if (requested > 0) {
384+
subscription.request(requested);
385+
}
386+
if (cancelled) {
387+
subscription.cancel();
388+
}
389+
}
390+
391+
private synchronized boolean relay() {
392+
return delegate != null;
393+
}
394+
}
355395
}

reactivesocket-core/src/main/java/io/reactivesocket/ServerReactiveSocket.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import io.reactivesocket.Frame.Request;
2121
import io.reactivesocket.Frame.Response;
2222
import io.reactivesocket.exceptions.ApplicationException;
23-
import io.reactivesocket.exceptions.RejectedSetupException;
2423
import io.reactivesocket.exceptions.SetupException;
2524
import io.reactivesocket.frame.FrameHeaderFlyweight;
26-
import io.reactivesocket.frame.SetupFrameFlyweight;
2725
import io.reactivesocket.internal.KnownErrorFilter;
2826
import io.reactivesocket.internal.RemoteReceiver;
2927
import io.reactivesocket.internal.RemoteSender;

reactivesocket-core/src/main/java/io/reactivesocket/server/ReactiveSocketServer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import io.reactivesocket.StreamIdSupplier;
2525
import io.reactivesocket.client.KeepAliveProvider;
2626
import io.reactivesocket.exceptions.SetupException;
27+
import io.reactivesocket.internal.ClientServerInputMultiplexer;
28+
import io.reactivesocket.lease.DefaultLeaseHonoringSocket;
2729
import io.reactivesocket.lease.LeaseEnforcingSocket;
30+
import io.reactivesocket.lease.LeaseHonoringSocket;
2831
import io.reactivesocket.reactivestreams.extensions.Px;
2932
import io.reactivesocket.transport.TransportServer;
3033
import io.reactivesocket.transport.TransportServer.StartedServer;
@@ -42,24 +45,28 @@ public interface ReactiveSocketServer {
4245

4346
static ReactiveSocketServer create(TransportServer transportServer) {
4447
return acceptor -> {
45-
return transportServer.start(duplexConnection -> {
46-
return Px.from(duplexConnection.receive())
48+
return transportServer.start(connection -> {
49+
return Px.from(connection.receive())
4750
.switchTo(setupFrame -> {
4851
if (setupFrame.getType() == FrameType.SETUP) {
52+
ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection);
4953
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
50-
ClientReactiveSocket sender = new ClientReactiveSocket(duplexConnection,
54+
ClientReactiveSocket sender = new ClientReactiveSocket(multiplexer.asServerConnection(),
5155
Throwable::printStackTrace,
5256
StreamIdSupplier.serverSupplier(),
5357
KeepAliveProvider.never());
58+
LeaseHonoringSocket lhs = new DefaultLeaseHonoringSocket(sender);
59+
sender.start(lhs);
5460
LeaseEnforcingSocket handler = acceptor.accept(setupPayload, sender);
55-
ServerReactiveSocket receiver = new ServerReactiveSocket(duplexConnection, handler,
61+
ServerReactiveSocket receiver = new ServerReactiveSocket(multiplexer.asClientConnection(),
62+
handler,
5663
setupPayload.willClientHonorLease(),
5764
Throwable::printStackTrace);
5865
receiver.start();
59-
return duplexConnection.onClose();
66+
return connection.onClose();
6067
} else {
6168
return Px.<Void>error(new IllegalStateException("Invalid first frame on the connection: "
62-
+ duplexConnection + ", frame type received: "
69+
+ connection + ", frame type received: "
6370
+ setupFrame.getType()));
6471
}
6572
});

reactivesocket-core/src/test/java/io/reactivesocket/test/util/LocalDuplexConnection.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,22 @@
1919
import io.reactivesocket.DuplexConnection;
2020
import io.reactivesocket.Frame;
2121
import io.reactivesocket.reactivestreams.extensions.Px;
22+
import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject;
2223
import io.reactivex.Flowable;
2324
import io.reactivex.processors.PublishProcessor;
2425
import org.reactivestreams.Publisher;
2526

2627
public class LocalDuplexConnection implements DuplexConnection {
2728
private final PublishProcessor<Frame> send;
2829
private final PublishProcessor<Frame> receive;
30+
private final EmptySubject closeNotifier;
2931
private final String name;
3032

3133
public LocalDuplexConnection(String name, PublishProcessor<Frame> send, PublishProcessor<Frame> receive) {
3234
this.name = name;
3335
this.send = send;
3436
this.receive = receive;
37+
closeNotifier = new EmptySubject();
3538
}
3639

3740
@Override
@@ -56,11 +59,14 @@ public double availability() {
5659

5760
@Override
5861
public Publisher<Void> close() {
59-
return Px.empty();
62+
return Px.defer(() -> {
63+
closeNotifier.onComplete();
64+
return Px.empty();
65+
});
6066
}
6167

6268
@Override
6369
public Publisher<Void> onClose() {
64-
return Px.empty();
70+
return closeNotifier;
6571
}
6672
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2017 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.examples.transport.tcp.duplex;
15+
16+
import io.reactivesocket.AbstractReactiveSocket;
17+
import io.reactivesocket.Payload;
18+
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.client.ReactiveSocketClient;
20+
import io.reactivesocket.client.ReactiveSocketClient.SocketAcceptor;
21+
import io.reactivesocket.frame.ByteBufferUtil;
22+
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
23+
import io.reactivesocket.lease.LeaseEnforcingSocket;
24+
import io.reactivesocket.server.ReactiveSocketServer;
25+
import io.reactivesocket.transport.TransportServer.StartedServer;
26+
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
27+
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
28+
import io.reactivesocket.util.PayloadImpl;
29+
import io.reactivex.Flowable;
30+
import org.reactivestreams.Publisher;
31+
32+
import java.net.SocketAddress;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static io.reactivesocket.client.KeepAliveProvider.*;
36+
import static io.reactivesocket.client.SetupProvider.*;
37+
38+
public final class DuplexClient {
39+
40+
public static void main(String[] args) {
41+
StartedServer server = ReactiveSocketServer.create(TcpTransportServer.create())
42+
.start((setupPayload, reactiveSocket) -> {
43+
Flowable.fromPublisher(reactiveSocket.requestStream(new PayloadImpl("Hello-Bidi")))
44+
.map(Payload::getData)
45+
.map(ByteBufferUtil::toUtf8String)
46+
.forEach(System.out::println);
47+
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { });
48+
});
49+
50+
SocketAddress address = server.getServerAddress();
51+
52+
ReactiveSocketClient rsclient = ReactiveSocketClient.createDuplex(TcpTransportClient.create(address),
53+
new SocketAcceptor() {
54+
@Override
55+
public LeaseEnforcingSocket accept(ReactiveSocket reactiveSocket) {
56+
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
57+
@Override
58+
public Publisher<Payload> requestStream(Payload payload) {
59+
return Flowable.interval(0, 1, TimeUnit.SECONDS).map(aLong -> new PayloadImpl("Bi-di Response => " + aLong));
60+
}
61+
});
62+
}
63+
}, keepAlive(never()).disableLease());
64+
65+
ReactiveSocket socket = Flowable.fromPublisher(rsclient.connect()).blockingFirst();
66+
67+
Flowable.fromPublisher(socket.onClose()).ignoreElements().blockingAwait();
68+
}
69+
}

0 commit comments

Comments
 (0)