Skip to content

introduces onClose listener for RSocketClient and connect method #1063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public void arbiter(IIIIII_Result r) {
id = {"1, 0, 1, 0, 1, 2"},
expect = ACCEPTABLE)
@State
public static class SubscribeBlockRace extends BaseStressTest {
public static class SubscribeBlockConnectRace extends BaseStressTest {

String receivedValue;

Expand All @@ -543,6 +543,11 @@ void subscribe() {
reconnectMono.subscribe(stressSubscriber);
}

@Actor
void connect() {
reconnectMono.resolvingInner.connect();
}

@Arbiter
public void arbiter(IIIIII_Result r) {
r.r1 = stressSubscription.subscribes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -65,19 +66,27 @@ class DefaultRSocketClient extends ResolvingOperator<RSocket>

final Mono<RSocket> source;

final Sinks.Empty<Void> onDisposeSink;

volatile Subscription s;

static final AtomicReferenceFieldUpdater<DefaultRSocketClient, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s");

DefaultRSocketClient(Mono<RSocket> source) {
this.source = unwrapReconnectMono(source);
this.onDisposeSink = Sinks.empty();
}

private Mono<RSocket> unwrapReconnectMono(Mono<RSocket> source) {
return source instanceof ReconnectMono ? ((ReconnectMono<RSocket>) source).getSource() : source;
}

@Override
public Mono<Void> onClose() {
return this.onDisposeSink.asMono();
}

@Override
public Mono<RSocket> source() {
return Mono.fromDirect(this);
Expand Down Expand Up @@ -194,6 +203,12 @@ protected void doOnValueExpired(RSocket value) {
@Override
protected void doOnDispose() {
Operators.terminate(S, this);
final RSocket value = this.value;
if (value != null) {
value.onClose().subscribe(null, onDisposeSink::tryEmitError, onDisposeSink::tryEmitEmpty);
} else {
onDisposeSink.tryEmitEmpty();
}
}

static final class FlatMapMain<R> implements CoreSubscriber<Payload>, Context, Scannable {
Expand Down
20 changes: 18 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package io.rsocket.core;

import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/**
* Contract for performing RSocket requests.
Expand Down Expand Up @@ -74,7 +75,22 @@
* @since 1.1
* @see io.rsocket.loadbalance.LoadbalanceRSocketClient
*/
public interface RSocketClient extends Disposable {
public interface RSocketClient extends Closeable {

/**
* Connect to the remote rsocket endpoint, if not yet connected. This method is a shortcut for
* {@code RSocketClient#source().subscribe()}.
*
* @return {@code true} if an attempt to connect was triggered or if already connected, or {@code
* false} if the client is terminated.
*/
default boolean connect() {
throw new NotImplementedException();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should raise NotImplementedException or not have a default implementation at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also needs Javadoc to state what it does, when this is useful vs just making requests, when it returns true vs false, and what happens for any scenarios of interest (e.g. if already connected, if already terminated, etc).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


default Mono<Void> onClose() {
return Mono.error(new NotImplementedException());
}

/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
Mono<RSocket> source();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,21 @@ public RSocket rsocket() {
return rsocket;
}

@Override
public boolean connect() {
throw new UnsupportedOperationException("Connect does not apply to a server side RSocket");
}

@Override
public Mono<RSocket> source() {
return Mono.just(rsocket);
}

@Override
public Mono<Void> onClose() {
return rsocket.onClose();
}

@Override
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
return payloadMono.flatMap(rsocket::fireAndForget);
Expand Down
24 changes: 24 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,30 @@ protected void doOnDispose() {
// no ops
}

public final boolean connect() {
for (; ; ) {
final BiConsumer<T, Throwable>[] a = this.subscribers;

if (a == TERMINATED) {
return false;
}

if (a == READY) {
return true;
}

if (a != EMPTY_UNSUBSCRIBED) {
// do nothing if already started
return true;
}

if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
this.doSubscribe();
return true;
}
}
}

final int add(BiConsumer<T, Throwable> ps) {
for (; ; ) {
BiConsumer<T, Throwable>[] a = this.subscribers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ private LoadbalanceRSocketClient(RSocketPool rSocketPool) {
this.rSocketPool = rSocketPool;
}

@Override
public Mono<Void> onClose() {
return rSocketPool.onClose();
}

@Override
public boolean connect() {
return rSocketPool.connect();
}

/** Return {@code Mono} that selects an RSocket from the underlying pool. */
@Override
public Mono<RSocket> source() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;

/** Default implementation of {@link RSocket} stored in {@link RSocketPool} */
Expand All @@ -35,6 +36,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
final RSocketPool parent;
final Mono<RSocket> rSocketSource;
final LoadbalanceTarget loadbalanceTarget;
final Sinks.Empty<Void> onCloseSink;

volatile Subscription s;

Expand All @@ -46,6 +48,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
this.parent = parent;
this.rSocketSource = rSocketSource;
this.loadbalanceTarget = loadbalanceTarget;
this.onCloseSink = Sinks.unsafe().empty();
}

@Override
Expand Down Expand Up @@ -155,6 +158,12 @@ void doCleanup(Throwable t) {
break;
}
}

if (t == ON_DISPOSE) {
this.onCloseSink.tryEmitEmpty();
} else {
this.onCloseSink.tryEmitError(t);
}
}

@Override
Expand All @@ -165,6 +174,13 @@ protected void doOnValueExpired(RSocket value) {
@Override
protected void doOnDispose() {
Operators.terminate(S, this);

final RSocket value = this.value;
if (value != null) {
value.onClose().subscribe(null, onCloseSink::tryEmitError, onCloseSink::tryEmitEmpty);
} else {
onCloseSink.tryEmitEmpty();
}
}

@Override
Expand Down Expand Up @@ -193,7 +209,12 @@ public Mono<Void> metadataPush(Payload payload) {
}

LoadbalanceTarget target() {
return loadbalanceTarget;
return this.loadbalanceTarget;
}

@Override
public Mono<Void> onClose() {
return this.onCloseSink.asMono();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.rsocket.loadbalance;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
Expand All @@ -28,16 +29,18 @@
import java.util.ListIterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

class RSocketPool extends ResolvingOperator<Object>
implements CoreSubscriber<List<LoadbalanceTarget>> {
implements CoreSubscriber<List<LoadbalanceTarget>>, Closeable {

static final AtomicReferenceFieldUpdater<RSocketPool, PooledRSocket[]> ACTIVE_SOCKETS =
AtomicReferenceFieldUpdater.newUpdater(
Expand All @@ -49,6 +52,7 @@ class RSocketPool extends ResolvingOperator<Object>
final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
final RSocketConnector connector;
final LoadbalanceStrategy loadbalanceStrategy;
final Sinks.Empty<Void> onAllClosedSink = Sinks.unsafe().empty();
volatile PooledRSocket[] activeSockets;
volatile Subscription s;

Expand All @@ -64,6 +68,11 @@ public RSocketPool(
targetPublisher.subscribe(this);
}

@Override
public Mono<Void> onClose() {
return onAllClosedSink.asMono();
}

@Override
protected void doOnDispose() {
Operators.terminate(S, this);
Expand All @@ -72,6 +81,14 @@ protected void doOnDispose() {
for (RSocket rSocket : activeSockets) {
rSocket.dispose();
}

if (activeSockets.length > 0) {
Mono.whenDelayError(
Arrays.stream(activeSockets).map(RSocket::onClose).collect(Collectors.toList()))
.subscribe(null, onAllClosedSink::tryEmitError, onAllClosedSink::tryEmitEmpty);
} else {
onAllClosedSink.tryEmitEmpty();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,30 @@ protected void doOnDispose() {
// no ops
}

public final boolean connect() {
for (; ; ) {
final BiConsumer<T, Throwable>[] a = this.subscribers;

if (a == TERMINATED) {
return false;
}

if (a == READY) {
return true;
}

if (a != EMPTY_UNSUBSCRIBED) {
// do nothing if already started
return true;
}

if (SUBSCRIBERS.compareAndSet(this, a, EMPTY_SUBSCRIBED)) {
this.doSubscribe();
return true;
}
}
}

final int add(BiConsumer<T, Throwable> ps) {
for (; ; ) {
BiConsumer<T, Throwable>[] a = this.subscribers;
Expand Down
Loading