Skip to content

Commit 26abe4b

Browse files
Oleh DokukaOlegDokuka
Oleh Dokuka
authored andcommitted
introduces onClose listener for RSocketClient
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent ef826de commit 26abe4b

File tree

9 files changed

+199
-11
lines changed

9 files changed

+199
-11
lines changed

rsocket-core/src/jcstress/java/io/rsocket/core/ReconnectMonoStressTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ public void arbiter(IIIIII_Result r) {
517517
id = {"1, 0, 1, 0, 1, 2"},
518518
expect = ACCEPTABLE)
519519
@State
520-
public static class SubscribeBlockRace extends BaseStressTest {
520+
public static class SubscribeBlockConnectRace extends BaseStressTest {
521521

522522
String receivedValue;
523523

rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import reactor.core.publisher.Mono;
3636
import reactor.core.publisher.MonoOperator;
3737
import reactor.core.publisher.Operators;
38+
import reactor.core.publisher.Sinks;
3839
import reactor.util.annotation.Nullable;
3940
import reactor.util.context.Context;
4041

@@ -65,19 +66,27 @@ class DefaultRSocketClient extends ResolvingOperator<RSocket>
6566

6667
final Mono<RSocket> source;
6768

69+
final Sinks.Empty<Void> onDisposeSink;
70+
6871
volatile Subscription s;
6972

7073
static final AtomicReferenceFieldUpdater<DefaultRSocketClient, Subscription> S =
7174
AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s");
7275

7376
DefaultRSocketClient(Mono<RSocket> source) {
7477
this.source = unwrapReconnectMono(source);
78+
this.onDisposeSink = Sinks.empty();
7579
}
7680

7781
private Mono<RSocket> unwrapReconnectMono(Mono<RSocket> source) {
7882
return source instanceof ReconnectMono ? ((ReconnectMono<RSocket>) source).getSource() : source;
7983
}
8084

85+
@Override
86+
public Mono<Void> onClose() {
87+
return this.onDisposeSink.asMono();
88+
}
89+
8190
@Override
8291
public Mono<RSocket> source() {
8392
return Mono.fromDirect(this);
@@ -194,6 +203,12 @@ protected void doOnValueExpired(RSocket value) {
194203
@Override
195204
protected void doOnDispose() {
196205
Operators.terminate(S, this);
206+
final RSocket value = this.value;
207+
if (value != null) {
208+
value.onClose().subscribe(null, onDisposeSink::tryEmitError, onDisposeSink::tryEmitEmpty);
209+
} else {
210+
onDisposeSink.tryEmitEmpty();
211+
}
197212
}
198213

199214
static final class FlatMapMain<R> implements CoreSubscriber<Payload>, Context, Scannable {

rsocket-core/src/main/java/io/rsocket/core/RSocketClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
*/
1616
package io.rsocket.core;
1717

18+
import io.rsocket.Closeable;
1819
import io.rsocket.Payload;
1920
import io.rsocket.RSocket;
2021
import org.reactivestreams.Publisher;
21-
import reactor.core.Disposable;
2222
import reactor.core.publisher.Flux;
2323
import reactor.core.publisher.Mono;
24+
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
2425

2526
/**
2627
* Contract for performing RSocket requests.
@@ -74,7 +75,11 @@
7475
* @since 1.1
7576
* @see io.rsocket.loadbalance.LoadbalanceRSocketClient
7677
*/
77-
public interface RSocketClient extends Disposable {
78+
public interface RSocketClient extends Closeable {
79+
80+
default Mono<Void> onClose() {
81+
return Mono.error(new NotImplementedException());
82+
}
7883

7984
/** Return the underlying source used to obtain a shared {@link RSocket} connection. */
8085
Mono<RSocket> source();

rsocket-core/src/main/java/io/rsocket/core/RSocketClientAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ public Mono<RSocket> source() {
4646
return Mono.just(rsocket);
4747
}
4848

49+
@Override
50+
public Mono<Void> onClose() {
51+
return rsocket.onClose();
52+
}
53+
4954
@Override
5055
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
5156
return payloadMono.flatMap(rsocket::fireAndForget);

rsocket-core/src/main/java/io/rsocket/loadbalance/LoadbalanceRSocketClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ private LoadbalanceRSocketClient(RSocketPool rSocketPool) {
4040
this.rSocketPool = rSocketPool;
4141
}
4242

43+
@Override
44+
public Mono<Void> onClose() {
45+
return rSocketPool.onClose();
46+
}
47+
4348
/** Return {@code Mono} that selects an RSocket from the underlying pool. */
4449
@Override
4550
public Mono<RSocket> source() {

rsocket-core/src/main/java/io/rsocket/loadbalance/PooledRSocket.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
2828
import reactor.core.publisher.Operators;
29+
import reactor.core.publisher.Sinks;
2930
import reactor.util.context.Context;
3031

3132
/** Default implementation of {@link RSocket} stored in {@link RSocketPool} */
@@ -35,6 +36,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
3536
final RSocketPool parent;
3637
final Mono<RSocket> rSocketSource;
3738
final LoadbalanceTarget loadbalanceTarget;
39+
final Sinks.Empty<Void> onCloseSink;
3840

3941
volatile Subscription s;
4042

@@ -46,6 +48,7 @@ final class PooledRSocket extends ResolvingOperator<RSocket>
4648
this.parent = parent;
4749
this.rSocketSource = rSocketSource;
4850
this.loadbalanceTarget = loadbalanceTarget;
51+
this.onCloseSink = Sinks.unsafe().empty();
4952
}
5053

5154
@Override
@@ -155,6 +158,12 @@ void doCleanup(Throwable t) {
155158
break;
156159
}
157160
}
161+
162+
if (t == ON_DISPOSE) {
163+
this.onCloseSink.tryEmitEmpty();
164+
} else {
165+
this.onCloseSink.tryEmitError(t);
166+
}
158167
}
159168

160169
@Override
@@ -165,6 +174,13 @@ protected void doOnValueExpired(RSocket value) {
165174
@Override
166175
protected void doOnDispose() {
167176
Operators.terminate(S, this);
177+
178+
final RSocket value = this.value;
179+
if (value != null) {
180+
value.onClose().subscribe(null, onCloseSink::tryEmitError, onCloseSink::tryEmitEmpty);
181+
} else {
182+
onCloseSink.tryEmitEmpty();
183+
}
168184
}
169185

170186
@Override
@@ -193,7 +209,12 @@ public Mono<Void> metadataPush(Payload payload) {
193209
}
194210

195211
LoadbalanceTarget target() {
196-
return loadbalanceTarget;
212+
return this.loadbalanceTarget;
213+
}
214+
215+
@Override
216+
public Mono<Void> onClose() {
217+
return this.onCloseSink.asMono();
197218
}
198219

199220
@Override

rsocket-core/src/main/java/io/rsocket/loadbalance/RSocketPool.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package io.rsocket.loadbalance;
1717

1818
import io.netty.util.ReferenceCountUtil;
19+
import io.rsocket.Closeable;
1920
import io.rsocket.Payload;
2021
import io.rsocket.RSocket;
2122
import io.rsocket.core.RSocketConnector;
@@ -28,16 +29,18 @@
2829
import java.util.ListIterator;
2930
import java.util.concurrent.CancellationException;
3031
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32+
import java.util.stream.Collectors;
3133
import org.reactivestreams.Publisher;
3234
import org.reactivestreams.Subscription;
3335
import reactor.core.CoreSubscriber;
3436
import reactor.core.publisher.Flux;
3537
import reactor.core.publisher.Mono;
3638
import reactor.core.publisher.Operators;
39+
import reactor.core.publisher.Sinks;
3740
import reactor.util.annotation.Nullable;
3841

3942
class RSocketPool extends ResolvingOperator<Object>
40-
implements CoreSubscriber<List<LoadbalanceTarget>> {
43+
implements CoreSubscriber<List<LoadbalanceTarget>>, Closeable {
4144

4245
static final AtomicReferenceFieldUpdater<RSocketPool, PooledRSocket[]> ACTIVE_SOCKETS =
4346
AtomicReferenceFieldUpdater.newUpdater(
@@ -49,6 +52,7 @@ class RSocketPool extends ResolvingOperator<Object>
4952
final DeferredResolutionRSocket deferredResolutionRSocket = new DeferredResolutionRSocket(this);
5053
final RSocketConnector connector;
5154
final LoadbalanceStrategy loadbalanceStrategy;
55+
final Sinks.Empty<Void> onAllClosedSink = Sinks.unsafe().empty();
5256
volatile PooledRSocket[] activeSockets;
5357
volatile Subscription s;
5458

@@ -64,6 +68,11 @@ public RSocketPool(
6468
targetPublisher.subscribe(this);
6569
}
6670

71+
@Override
72+
public Mono<Void> onClose() {
73+
return onAllClosedSink.asMono();
74+
}
75+
6776
@Override
6877
protected void doOnDispose() {
6978
Operators.terminate(S, this);
@@ -72,6 +81,14 @@ protected void doOnDispose() {
7281
for (RSocket rSocket : activeSockets) {
7382
rSocket.dispose();
7483
}
84+
85+
if (activeSockets.length > 0) {
86+
Mono.whenDelayError(
87+
Arrays.stream(activeSockets).map(RSocket::onClose).collect(Collectors.toList()))
88+
.subscribe(null, onAllClosedSink::tryEmitError, onAllClosedSink::tryEmitEmpty);
89+
} else {
90+
onAllClosedSink.tryEmitEmpty();
91+
}
7592
}
7693

7794
@Override

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.rsocket.frame.decoder.PayloadDecoder;
3030
import io.rsocket.internal.subscriber.AssertSubscriber;
3131
import io.rsocket.util.ByteBufPayload;
32+
import io.rsocket.util.RSocketProxy;
3233
import java.time.Duration;
3334
import java.util.ArrayList;
3435
import java.util.Collection;
@@ -457,6 +458,43 @@ public void shouldDisposeOriginalSource() {
457458
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
458459
}
459460

461+
@Test
462+
public void shouldReceiveOnCloseNotificationOnDisposeOriginalSource() {
463+
Sinks.Empty<Void> onCloseDelayer = Sinks.empty();
464+
ClientSocketRule rule =
465+
new ClientSocketRule() {
466+
@Override
467+
protected RSocket newRSocket() {
468+
return new RSocketProxy(super.newRSocket()) {
469+
@Override
470+
public Mono<Void> onClose() {
471+
return super.onClose().and(onCloseDelayer.asMono());
472+
}
473+
};
474+
}
475+
};
476+
rule.init();
477+
AssertSubscriber<RSocket> assertSubscriber = AssertSubscriber.create();
478+
rule.client.source().subscribe(assertSubscriber);
479+
rule.delayer.run();
480+
assertSubscriber.assertTerminated().assertValueCount(1);
481+
482+
rule.client.dispose();
483+
484+
Assertions.assertThat(rule.client.isDisposed()).isTrue();
485+
486+
AssertSubscriber<Void> onCloseSubscriber = AssertSubscriber.create();
487+
488+
rule.client.onClose().subscribe(onCloseSubscriber);
489+
onCloseSubscriber.assertNotTerminated();
490+
491+
onCloseDelayer.tryEmitEmpty();
492+
493+
onCloseSubscriber.assertTerminated().assertComplete();
494+
495+
Assertions.assertThat(rule.socket.isDisposed()).isTrue();
496+
}
497+
460498
@Test
461499
public void shouldDisposeOriginalSourceIfRacing() {
462500
for (int i = 0; i < RaceTestConstants.REPEATS; i++) {
@@ -485,7 +523,7 @@ public void shouldDisposeOriginalSourceIfRacing() {
485523
}
486524
}
487525

488-
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
526+
public static class ClientSocketRule extends AbstractSocketRule<RSocket> {
489527

490528
protected RSocketClient client;
491529
protected Runnable delayer;
@@ -498,14 +536,16 @@ protected void doInit() {
498536
producer = Sinks.one();
499537
client =
500538
new DefaultRSocketClient(
501-
producer
502-
.asMono()
503-
.doOnCancel(() -> socket.dispose())
504-
.doOnDiscard(Disposable.class, Disposable::dispose));
539+
Mono.defer(
540+
() ->
541+
producer
542+
.asMono()
543+
.doOnCancel(() -> socket.dispose())
544+
.doOnDiscard(Disposable.class, Disposable::dispose)));
505545
}
506546

507547
@Override
508-
protected RSocketRequester newRSocket() {
548+
protected RSocket newRSocket() {
509549
return new RSocketRequester(
510550
connection,
511551
PayloadDecoder.ZERO_COPY,

0 commit comments

Comments
 (0)