Skip to content

Duplex interactions #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivesocket.lease.LeaseImpl;
import io.reactivesocket.reactivestreams.extensions.DefaultSubscriber;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.FlowControlHelper;
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.CancellableSubscriber;
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
Expand Down Expand Up @@ -54,7 +55,7 @@ public class ClientReactiveSocket implements ReactiveSocket {
private final Int2ObjectHashMap<Subscription> senders;
private final Int2ObjectHashMap<Subscriber<Frame>> receivers;

private volatile Subscription transportReceiveSubscription;
private final BufferingSubscription transportReceiveSubscription = new BufferingSubscription();
private CancellableSubscriber<Void> keepAliveSendSub;
private volatile Consumer<Lease> leaseConsumer; // Provided on start()

Expand Down Expand Up @@ -190,7 +191,7 @@ private void startKeepAlive() {
private void startReceivingRequests() {
Px
.from(connection.receive())
.doOnSubscribe(subscription -> transportReceiveSubscription = subscription)
.doOnSubscribe(subscription -> transportReceiveSubscription.switchTo(subscription))
.doOnNext(this::handleIncomingFrames)
.subscribe();
}
Expand All @@ -200,9 +201,7 @@ protected void cleanup() {
if (null != keepAliveSendSub) {
keepAliveSendSub.cancel();
}
if (null != transportReceiveSubscription) {
transportReceiveSubscription.cancel();
}
transportReceiveSubscription.cancel();
}

private void handleIncomingFrames(Frame frame) {
Expand Down Expand Up @@ -352,4 +351,45 @@ private synchronized void registerSenderReceiver(int streamId, Subscription send
senders.put(streamId, sender);
receivers.put(streamId, receiver);
}

private static class BufferingSubscription implements Subscription {

private int requested;
private boolean cancelled;
private Subscription delegate;

@Override
public void request(long n) {
if (relay()) {
delegate.request(n);
} else {
requested = FlowControlHelper.incrementRequestN(requested, n);
}
}

@Override
public void cancel() {
if (relay()) {
delegate.cancel();
} else {
cancelled = true;
}
}

private void switchTo(Subscription subscription) {
synchronized (this) {
delegate = subscription;
}
if (requested > 0) {
subscription.request(requested);
}
if (cancelled) {
subscription.cancel();
}
}

private synchronized boolean relay() {
return delegate != null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.reactivesocket.Frame.Request;
import io.reactivesocket.Frame.Response;
import io.reactivesocket.exceptions.ApplicationException;
import io.reactivesocket.exceptions.RejectedSetupException;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.frame.FrameHeaderFlyweight;
import io.reactivesocket.frame.SetupFrameFlyweight;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.RemoteReceiver;
import io.reactivesocket.internal.RemoteSender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import io.reactivesocket.StreamIdSupplier;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.exceptions.SetupException;
import io.reactivesocket.internal.ClientServerInputMultiplexer;
import io.reactivesocket.lease.DefaultLeaseHonoringSocket;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.lease.LeaseHonoringSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.transport.TransportServer.StartedServer;
Expand All @@ -42,24 +45,28 @@ public interface ReactiveSocketServer {

static ReactiveSocketServer create(TransportServer transportServer) {
return acceptor -> {
return transportServer.start(duplexConnection -> {
return Px.from(duplexConnection.receive())
return transportServer.start(connection -> {
return Px.from(connection.receive())
.switchTo(setupFrame -> {
if (setupFrame.getType() == FrameType.SETUP) {
ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection);
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
ClientReactiveSocket sender = new ClientReactiveSocket(duplexConnection,
ClientReactiveSocket sender = new ClientReactiveSocket(multiplexer.asServerConnection(),
Throwable::printStackTrace,
StreamIdSupplier.serverSupplier(),
KeepAliveProvider.never());
LeaseHonoringSocket lhs = new DefaultLeaseHonoringSocket(sender);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The oddity here is that there is no way today in the protocol where a server can tell whether it will honor leases from the client. Probably this should piggy back on the client will honor lease in the setup saying if a client honors lease, server will too.
I am interested in hearing what others think about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tmontgomery Your input would be useful on this I imagine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it even make sense for a client responder to use leases? Either it can take requests or it can't (reject/fail). For example, a server-mobile relationship doesn't make sense for leases, as once a mobile client is connected, it only has that one connection to send requests over. Leases really only make sense in server-server situations where a requesting server has multiple connections open and can send requests to different servers depending on their leases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's an oversight.
We could decide that the server will match the configuration of the client.

sender.start(lhs);
LeaseEnforcingSocket handler = acceptor.accept(setupPayload, sender);
ServerReactiveSocket receiver = new ServerReactiveSocket(duplexConnection, handler,
ServerReactiveSocket receiver = new ServerReactiveSocket(multiplexer.asClientConnection(),
handler,
setupPayload.willClientHonorLease(),
Throwable::printStackTrace);
receiver.start();
return duplexConnection.onClose();
return connection.onClose();
} else {
return Px.<Void>error(new IllegalStateException("Invalid first frame on the connection: "
+ duplexConnection + ", frame type received: "
+ connection + ", frame type received: "
+ setupFrame.getType()));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@
import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject;
import io.reactivex.Flowable;
import io.reactivex.processors.PublishProcessor;
import org.reactivestreams.Publisher;

public class LocalDuplexConnection implements DuplexConnection {
private final PublishProcessor<Frame> send;
private final PublishProcessor<Frame> receive;
private final EmptySubject closeNotifier;
private final String name;

public LocalDuplexConnection(String name, PublishProcessor<Frame> send, PublishProcessor<Frame> receive) {
this.name = name;
this.send = send;
this.receive = receive;
closeNotifier = new EmptySubject();
}

@Override
Expand All @@ -56,11 +59,14 @@ public double availability() {

@Override
public Publisher<Void> close() {
return Px.empty();
return Px.defer(() -> {
closeNotifier.onComplete();
return Px.empty();
});
}

@Override
public Publisher<Void> onClose() {
return Px.empty();
return closeNotifier;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2017 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.examples.transport.tcp.duplex;

import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.ReactiveSocketClient.SocketAcceptor;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.lease.LeaseEnforcingSocket;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.TransportServer.StartedServer;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;

import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

import static io.reactivesocket.client.KeepAliveProvider.*;
import static io.reactivesocket.client.SetupProvider.*;

public final class DuplexClient {

public static void main(String[] args) {
StartedServer server = ReactiveSocketServer.create(TcpTransportServer.create())
.start((setupPayload, reactiveSocket) -> {
Flowable.fromPublisher(reactiveSocket.requestStream(new PayloadImpl("Hello-Bidi")))
.map(Payload::getData)
.map(ByteBufferUtil::toUtf8String)
.forEach(System.out::println);
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { });
});

SocketAddress address = server.getServerAddress();

ReactiveSocketClient rsclient = ReactiveSocketClient.createDuplex(TcpTransportClient.create(address),
new SocketAcceptor() {
@Override
public LeaseEnforcingSocket accept(ReactiveSocket reactiveSocket) {
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() {
@Override
public Publisher<Payload> requestStream(Payload payload) {
return Flowable.interval(0, 1, TimeUnit.SECONDS).map(aLong -> new PayloadImpl("Bi-di Response => " + aLong));
}
});
}
}, keepAlive(never()).disableLease());

ReactiveSocket socket = Flowable.fromPublisher(rsclient.connect()).blockingFirst();

Flowable.fromPublisher(socket.onClose()).ignoreElements().blockingAwait();
}
}