Skip to content

Send root-cause errors from connection to onClose of RSocket #797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 26, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions rsocket-core/src/main/java/io/rsocket/Closeable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@

package io.rsocket;

import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/** */
/** An interface which allows listening to when a specific instance of this interface is closed */
public interface Closeable extends Disposable {
/**
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying
* transport connection is closed.
* Returns a {@link Mono} that terminates when the instance is terminated by any reason. Note, in
* case of error termination, the cause of error will be propagated as an error signal through
* {@link org.reactivestreams.Subscriber#onError(Throwable)}. Otherwise, {@link
* Subscriber#onComplete()} will be called.
*
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
* @return a closable instance of {@link Mono}.
*/
Mono<Void> onClose();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class RSocketConnector {
private static final int MIN_MTU_SIZE = 64;

private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
(r, i) -> r.onClose().subscribe(null, null, i::invalidate);
(r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);

private Payload setupPayload = EmptyPayload.INSTANCE;
private String metadataMimeType = "application/binary";
Expand Down
25 changes: 17 additions & 8 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.util.MonoLifecycleHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
Expand All @@ -67,6 +68,7 @@
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;
Expand Down Expand Up @@ -106,6 +108,7 @@ class RSocketRequester implements RSocket {
private final ByteBufAllocator allocator;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private volatile Throwable terminationError;
private final MonoProcessor<Void> onClose;

RSocketRequester(
DuplexConnection connection,
Expand All @@ -126,14 +129,15 @@ class RSocketRequester implements RSocket {
this.leaseHandler = leaseHandler;
this.senders = new SynchronizedIntObjectHashMap<>();
this.receivers = new SynchronizedIntObjectHashMap<>();
this.onClose = MonoProcessor.create();

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
this.sendProcessor = new UnboundedProcessor<>();

connection
.onClose()
.doFinally(signalType -> tryTerminateOnConnectionClose())
.subscribe(null, errorConsumer);
.or(onClose)
.subscribe(null, this::tryTerminateOnConnectionClose, this::tryTerminateOnConnectionClose);
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
Expand Down Expand Up @@ -181,17 +185,17 @@ public double availability() {

@Override
public void dispose() {
connection.dispose();
tryTerminate(() -> new CancellationException("Disposed"));
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
return onClose.isDisposed();
}

@Override
public Mono<Void> onClose() {
return connection.onClose();
return onClose;
}

private Mono<Void> handleFireAndForget(Payload payload) {
Expand Down Expand Up @@ -619,6 +623,10 @@ private void tryTerminateOnKeepAlive(KeepAlive keepAlive) {
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
}

private void tryTerminateOnConnectionClose(Throwable e) {
tryTerminate(() -> e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryTerminateOnConnectionError perhaps?


private void tryTerminateOnConnectionClose() {
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
}
Expand All @@ -627,16 +635,16 @@ private void tryTerminateOnZeroError(ByteBuf errorFrame) {
tryTerminate(() -> Exceptions.from(0, errorFrame));
}

private void tryTerminate(Supplier<Exception> errorSupplier) {
private void tryTerminate(Supplier<Throwable> errorSupplier) {
if (terminationError == null) {
Exception e = errorSupplier.get();
Throwable e = errorSupplier.get();
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
terminate(e);
}
}
}

private void terminate(Exception e) {
private void terminate(Throwable e) {
connection.dispose();
leaseHandler.dispose();

Expand Down Expand Up @@ -668,6 +676,7 @@ private void terminate(Exception e) {
receivers.clear();
sendProcessor.dispose();
errorConsumer.accept(e);
onClose.onError(e);
}

private void removeStreamReceiver(int streamId) {
Expand Down
92 changes: 48 additions & 44 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@
import io.rsocket.internal.SynchronizedIntObjectHashMap;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.lease.ResponderLeaseHandler;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
Expand All @@ -58,13 +62,21 @@ class RSocketResponder implements ResponderRSocket {
}
}
};
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();

private final DuplexConnection connection;
private final RSocket requestHandler;
private final ResponderRSocket responderRSocket;
private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;
private final Disposable leaseHandlerDisposable;
private final MonoProcessor<Void> onClose;

private volatile Throwable terminationError;
private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR =
AtomicReferenceFieldUpdater.newUpdater(
RSocketResponder.class, Throwable.class, "terminationError");

private final int mtu;

Expand Down Expand Up @@ -94,28 +106,21 @@ class RSocketResponder implements ResponderRSocket {
this.leaseHandler = leaseHandler;
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
this.onClose = MonoProcessor.create();

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
this.sendProcessor = new UnboundedProcessor<>();

connection
.send(sendProcessor)
.doFinally(this::handleSendProcessorCancel)
.subscribe(null, this::handleSendProcessorError);
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);
connection.receive().subscribe(this::handleFrame, errorConsumer);
leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
.onClose()
.doFinally(
s -> {
cleanup();
receiveDisposable.dispose();
sendLeaseDisposable.dispose();
})
.subscribe(null, errorConsumer);
.or(onClose)
.subscribe(null, this::tryTerminateOnConnectionClose, this::tryTerminateOnConnectionClose);
}

private void handleSendProcessorError(Throwable t) {
Expand All @@ -142,32 +147,21 @@ private void handleSendProcessorError(Throwable t) {
});
}

private void handleSendProcessorCancel(SignalType t) {
if (SignalType.ON_ERROR == t) {
return;
}
private void tryTerminateOnConnectionClose(Throwable e) {
tryTerminate(() -> e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryTerminateOnConnectionError?


sendingSubscriptions
.values()
.forEach(
subscription -> {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
private void tryTerminateOnConnectionClose() {
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
}

channelProcessors
.values()
.forEach(
subscription -> {
try {
subscription.onComplete();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
private void tryTerminate(Supplier<Throwable> errorSupplier) {
if (terminationError == null) {
Throwable e = errorSupplier.get();
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
cleanup(e);
}
}
}

@Override
Expand Down Expand Up @@ -250,23 +244,25 @@ public Mono<Void> metadataPush(Payload payload) {

@Override
public void dispose() {
connection.dispose();
tryTerminate(() -> new CancellationException("Disposed"));
}

@Override
public boolean isDisposed() {
return connection.isDisposed();
return onClose.isDisposed();
}

@Override
public Mono<Void> onClose() {
return connection.onClose();
return onClose;
}

private void cleanup() {
private void cleanup(Throwable e) {
cleanUpSendingSubscriptions();
cleanUpChannelProcessors();
cleanUpChannelProcessors(e);

connection.dispose();
leaseHandlerDisposable.dispose();
requestHandler.dispose();
sendProcessor.dispose();
}
Expand All @@ -276,8 +272,16 @@ private synchronized void cleanUpSendingSubscriptions() {
sendingSubscriptions.clear();
}

private synchronized void cleanUpChannelProcessors() {
channelProcessors.values().forEach(Processor::onComplete);
private synchronized void cleanUpChannelProcessors(Throwable e) {
channelProcessors
.values()
.forEach(payloadPayloadProcessor -> {
try {
payloadPayloadProcessor.onError(e);
} catch (Throwable t) {
// noops
}
});
channelProcessors.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

Expand Down Expand Up @@ -105,7 +104,6 @@ public void reconnectOnMissingSession() {

DisconnectableClientTransport clientTransport =
new DisconnectableClientTransport(clientTransport(closeable.address()));
ErrorConsumer errorConsumer = new ErrorConsumer();
int clientSessionDurationSeconds = 10;

RSocket rSocket = newClientRSocket(clientTransport, clientSessionDurationSeconds).block();
Expand All @@ -118,12 +116,11 @@ public void reconnectOnMissingSession() {
.expectError()
.verify(Duration.ofSeconds(5));

StepVerifier.create(errorConsumer.errors().next())
.expectNextMatches(
StepVerifier.create(rSocket.onClose())
.expectErrorMatches(
err ->
err instanceof RejectedResumeException
&& "unknown resume token".equals(err.getMessage()))
.expectComplete()
.verify(Duration.ofSeconds(5));
}

Expand All @@ -134,23 +131,19 @@ void serverMissingResume() {
.bind(serverTransport(SERVER_HOST, SERVER_PORT))
.block();

ErrorConsumer errorConsumer = new ErrorConsumer();

RSocket rSocket =
RSocketConnector.create()
.resume(new Resume())
.connect(clientTransport(closeableChannel.address()))
.block();

StepVerifier.create(errorConsumer.errors().next().doFinally(s -> closeableChannel.dispose()))
.expectNextMatches(
StepVerifier.create(rSocket.onClose().doFinally(s -> closeableChannel.dispose()))
.expectErrorMatches(
err ->
err instanceof UnsupportedSetupException
&& "resume not supported".equals(err.getMessage()))
.expectComplete()
.verify(Duration.ofSeconds(5));

StepVerifier.create(rSocket.onClose()).expectComplete().verify(Duration.ofSeconds(5));
Assertions.assertThat(rSocket.isDisposed()).isTrue();
}

Expand All @@ -162,19 +155,6 @@ static ServerTransport<CloseableChannel> serverTransport(String host, int port)
return TcpServerTransport.create(host, port);
}

private static class ErrorConsumer implements Consumer<Throwable> {
private final ReplayProcessor<Throwable> errors = ReplayProcessor.create();

public Flux<Throwable> errors() {
return errors;
}

@Override
public void accept(Throwable throwable) {
errors.onNext(throwable);
}
}

private static Flux<Payload> testRequest() {
return Flux.interval(Duration.ofMillis(50))
.map(v -> DefaultPayload.create("client_request"))
Expand Down