Skip to content

improves DuplexConnection api and reworks Resumability #923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 9 additions & 21 deletions rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,28 @@
import io.netty.buffer.ByteBufAllocator;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/** Represents a connection with input/output that the protocol uses. */
public interface DuplexConnection extends Availability, Closeable {

/**
* Sends the source of Frames on this connection and returns the {@code Publisher} representing
* the result of this send.
* Delivers the given frame to the underlying transport connection. This method is non-blocking
* and can be safely executed from multiple threads. This method does not provide any flow-control
* mechanism.
*
* <p><strong>Flow control</strong>
*
* <p>The passed {@code Publisher} must
*
* @param frames Stream of {@code Frame}s to send on the connection.
* @return {@code Publisher} that completes when all the frames are written on the connection
* successfully and errors when it fails.
* @throws NullPointerException if {@code frames} is {@code null}
* @param streamId to which the given frame relates
* @param frame with the encoded content
*/
Mono<Void> send(Publisher<ByteBuf> frames);
void sendFrame(int streamId, ByteBuf frame);

/**
* Sends a single {@code Frame} on this connection and returns the {@code Publisher} representing
* the result of this send.
* Send an error frame and after it is successfully sent, close the connection.
*
* @param frame {@code Frame} to send.
* @return {@code Publisher} that completes when the frame is written on the connection
* successfully and errors when it fails.
* @param errorException to encode in the error frame
*/
default Mono<Void> sendOne(ByteBuf frame) {
return send(Mono.just(frame));
}
void sendErrorAndClose(RSocketErrorException errorException);

/**
* Returns a stream of all {@code Frame}s received on this connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -50,67 +47,40 @@
*/
class ClientServerInputMultiplexer implements CoreSubscriber<ByteBuf>, Closeable {

private static final Logger LOGGER = LoggerFactory.getLogger("io.rsocket.FrameLogger");
private static final InitializingInterceptorRegistry emptyInterceptorRegistry =
new InitializingInterceptorRegistry();

private final InternalDuplexConnection setupReceiver;
private final InternalDuplexConnection serverReceiver;
private final InternalDuplexConnection clientReceiver;
private final DuplexConnection setupConnection;
private final DuplexConnection serverConnection;
private final DuplexConnection clientConnection;
private final DuplexConnection source;
private final boolean isClient;

private Subscription s;
private boolean setupReceived;

private Throwable t;

private volatile int state;
private static final AtomicIntegerFieldUpdater<ClientServerInputMultiplexer> STATE =
AtomicIntegerFieldUpdater.newUpdater(ClientServerInputMultiplexer.class, "state");

public ClientServerInputMultiplexer(DuplexConnection source) {
this(source, emptyInterceptorRegistry, false);
}

public ClientServerInputMultiplexer(
DuplexConnection source, InitializingInterceptorRegistry registry, boolean isClient) {
this.source = source;
this.isClient = isClient;
source = registry.initConnection(Type.SOURCE, source);

if (!isClient) {
setupReceiver = new InternalDuplexConnection(this, source);
setupConnection = registry.initConnection(Type.SETUP, setupReceiver);
} else {
setupReceiver = null;
setupConnection = null;
}
serverReceiver = new InternalDuplexConnection(this, source);
clientReceiver = new InternalDuplexConnection(this, source);
serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
this.serverReceiver = new InternalDuplexConnection(this, source);
this.clientReceiver = new InternalDuplexConnection(this, source);
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
}

public DuplexConnection asClientServerConnection() {
return source;
}

public DuplexConnection asServerConnection() {
DuplexConnection asServerConnection() {
return serverConnection;
}

public DuplexConnection asClientConnection() {
DuplexConnection asClientConnection() {
return clientConnection;
}

public DuplexConnection asSetupConnection() {
return setupConnection;
}

@Override
public void dispose() {
source.dispose();
Expand All @@ -130,12 +100,7 @@ public Mono<Void> onClose() {
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
if (isClient) {
s.request(Long.MAX_VALUE);
} else {
// request first SetupFrame
s.request(1);
}
s.request(Long.MAX_VALUE);
}
}

Expand All @@ -145,12 +110,6 @@ public void onNext(ByteBuf frame) {
final Type type;
if (streamId == 0) {
switch (FrameHeaderCodec.frameType(frame)) {
case SETUP:
case RESUME:
case RESUME_OK:
type = Type.SETUP;
setupReceived = true;
break;
case LEASE:
case KEEPALIVE:
case ERROR:
Expand All @@ -164,19 +123,8 @@ public void onNext(ByteBuf frame) {
} else {
type = Type.CLIENT;
}
if (!isClient && type != Type.SETUP && !setupReceived) {
final IllegalStateException error =
new IllegalStateException("SETUP or LEASE frame must be received before any others.");
this.s.cancel();
onError(error);
}

switch (type) {
case SETUP:
final InternalDuplexConnection setupReceiver = this.setupReceiver;
setupReceiver.onNext(frame);
setupReceiver.onComplete();
break;
case CLIENT:
clientReceiver.onNext(frame);
break;
Expand All @@ -193,16 +141,6 @@ public void onComplete() {
return;
}

if (!isClient) {
if (!setupReceived) {
setupReceiver.onComplete();
}

if (previousState == 1) {
return;
}
}

if (clientReceiver.isSubscribed()) {
clientReceiver.onComplete();
}
Expand All @@ -220,16 +158,6 @@ public void onError(Throwable t) {
return;
}

if (!isClient) {
if (!setupReceived) {
setupReceiver.onError(t);
}

if (previousState == 1) {
return;
}
}

if (clientReceiver.isSubscribed()) {
clientReceiver.onError(t);
}
Expand All @@ -244,17 +172,8 @@ boolean notifyRequested() {
return false;
}

if (isClient) {
if (currentState == 2) {
source.receive().subscribe(this);
}
} else {
if (currentState == 1) {
source.receive().subscribe(this);
} else if (currentState == 3) {
// means setup was consumed and we got request from client and server multiplexers
s.request(Long.MAX_VALUE);
}
if (currentState == 2) {
source.receive().subscribe(this);
}

return true;
Expand All @@ -280,7 +199,6 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
implements Subscription, DuplexConnection {
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
private final DuplexConnection source;
private final boolean debugEnabled;

private volatile int state;
static final AtomicIntegerFieldUpdater<InternalDuplexConnection> STATE =
Expand All @@ -292,7 +210,6 @@ public InternalDuplexConnection(
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
this.source = source;
this.debugEnabled = LOGGER.isDebugEnabled();
}

@Override
Expand Down Expand Up @@ -340,32 +257,18 @@ void onError(Throwable t) {
}

@Override
public Mono<Void> send(Publisher<ByteBuf> frame) {
if (debugEnabled) {
return Flux.from(frame)
.doOnNext(f -> LOGGER.debug("sending -> " + FrameUtil.toString(f)))
.as(source::send);
}

return source.send(frame);
public void sendFrame(int streamId, ByteBuf frame) {
source.sendFrame(streamId, frame);
}

@Override
public Mono<Void> sendOne(ByteBuf frame) {
if (debugEnabled) {
LOGGER.debug("sending -> " + FrameUtil.toString(frame));
}

return source.sendOne(frame);
public void sendErrorAndClose(RSocketErrorException e) {
source.sendErrorAndClose(e);
}

@Override
public Flux<ByteBuf> receive() {
if (debugEnabled) {
return this.doOnNext(frame -> LOGGER.debug("receiving -> " + FrameUtil.toString(frame)));
} else {
return this;
}
return this;
}

@Override
Expand Down
32 changes: 32 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/ClientSetup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.rsocket.core;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.rsocket.DuplexConnection;
import java.nio.channels.ClosedChannelException;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

abstract class ClientSetup {
abstract Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection);
}

class DefaultClientSetup extends ClientSetup {

@Override
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
return Mono.create(
sink -> sink.onRequest(__ -> sink.success(Tuples.of(Unpooled.EMPTY_BUFFER, connection))));
}
}

class ResumableClientSetup extends ClientSetup {

@Override
Mono<Tuple2<ByteBuf, DuplexConnection>> init(DuplexConnection connection) {
return Mono.<Tuple2<ByteBuf, DuplexConnection>>create(
sink -> sink.onRequest(__ -> new SetupHandlingDuplexConnection(connection, sink)))
.or(connection.onClose().then(Mono.error(ClosedChannelException::new)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import static io.rsocket.core.SendUtils.sendReleasingPayload;
import static io.rsocket.core.StateUtils.*;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.IllegalReferenceCountException;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import io.rsocket.internal.UnboundedProcessor;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
Expand All @@ -50,15 +49,15 @@ final class FireAndForgetRequesterMono extends Mono<Void> implements Subscriptio
final int mtu;
final int maxFrameLength;
final RequesterResponderSupport requesterResponderSupport;
final UnboundedProcessor<ByteBuf> sendProcessor;
final DuplexConnection connection;

FireAndForgetRequesterMono(Payload payload, RequesterResponderSupport requesterResponderSupport) {
this.allocator = requesterResponderSupport.getAllocator();
this.payload = payload;
this.mtu = requesterResponderSupport.getMtu();
this.maxFrameLength = requesterResponderSupport.getMaxFrameLength();
this.requesterResponderSupport = requesterResponderSupport;
this.sendProcessor = requesterResponderSupport.getSendProcessor();
this.connection = requesterResponderSupport.getDuplexConnection();
}

@Override
Expand Down Expand Up @@ -106,7 +105,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
}

sendReleasingPayload(
streamId, FrameType.REQUEST_FNF, mtu, p, this.sendProcessor, this.allocator, true);
streamId, FrameType.REQUEST_FNF, mtu, p, this.connection, this.allocator, true);
} catch (Throwable e) {
lazyTerminate(STATE, this);
actual.onError(e);
Expand Down Expand Up @@ -169,7 +168,7 @@ public Void block() {
FrameType.REQUEST_FNF,
this.mtu,
this.payload,
this.sendProcessor,
this.connection,
this.allocator,
true);
} catch (Throwable e) {
Expand Down
Loading