Skip to content

Commit e837f2f

Browse files
committed
fixes onClose behaviour to not error on shutdown
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 9521a9d commit e837f2f

File tree

3 files changed

+46
-19
lines changed

3 files changed

+46
-19
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
import reactor.core.publisher.BaseSubscriber;
6565
import reactor.core.publisher.Flux;
6666
import reactor.core.publisher.Mono;
67-
import reactor.core.publisher.MonoProcessor;
6867
import reactor.core.publisher.Operators;
6968
import reactor.core.publisher.SignalType;
7069
import reactor.core.publisher.UnicastProcessor;
@@ -110,7 +109,6 @@ class RSocketRequester implements RSocket {
110109
private final RequesterLeaseHandler leaseHandler;
111110
private final ByteBufAllocator allocator;
112111
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
113-
private final MonoProcessor<Void> onClose;
114112
private final Scheduler serialScheduler;
115113

116114
RSocketRequester(
@@ -131,16 +129,12 @@ class RSocketRequester implements RSocket {
131129
this.leaseHandler = leaseHandler;
132130
this.senders = new SynchronizedIntObjectHashMap<>();
133131
this.receivers = new SynchronizedIntObjectHashMap<>();
134-
this.onClose = MonoProcessor.create();
135132
this.serialScheduler = serialScheduler;
136133

137134
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
138135
this.sendProcessor = new UnboundedProcessor<>();
139136

140-
connection
141-
.onClose()
142-
.or(onClose)
143-
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
137+
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
144138
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
145139

146140
connection.receive().subscribe(this::handleIncomingFrames, e -> {});
@@ -193,12 +187,12 @@ public void dispose() {
193187

194188
@Override
195189
public boolean isDisposed() {
196-
return onClose.isDisposed();
190+
return connection.isDisposed();
197191
}
198192

199193
@Override
200194
public Mono<Void> onClose() {
201-
return onClose;
195+
return connection.onClose();
202196
}
203197

204198
private Mono<Void> handleFireAndForget(Payload payload) {
@@ -709,10 +703,6 @@ private void tryTerminateOnConnectionError(Throwable e) {
709703
tryTerminate(() -> e);
710704
}
711705

712-
private void tryTerminateOnConnectionClose() {
713-
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
714-
}
715-
716706
private void tryTerminateOnZeroError(ByteBuf errorFrame) {
717707
tryTerminate(() -> Exceptions.from(0, errorFrame));
718708
}
@@ -726,6 +716,14 @@ private void tryTerminate(Supplier<Throwable> errorSupplier) {
726716
}
727717
}
728718

719+
private void tryShutdown() {
720+
if (terminationError == null) {
721+
if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
722+
terminate(CLOSED_CHANNEL_EXCEPTION);
723+
}
724+
}
725+
}
726+
729727
private void terminate(Throwable e) {
730728
connection.dispose();
731729
leaseHandler.dispose();
@@ -761,7 +759,6 @@ private void terminate(Throwable e) {
761759
senders.clear();
762760
receivers.clear();
763761
sendProcessor.dispose();
764-
onClose.onError(e);
765762
}
766763

767764
private void handleSendProcessorError(Throwable t) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class RSocketResponder implements RSocket {
7676
private final PayloadDecoder payloadDecoder;
7777
private final ResponderLeaseHandler leaseHandler;
7878
private final Disposable leaseHandlerDisposable;
79-
private final MonoProcessor<Void> onClose;
8079

8180
private volatile Throwable terminationError;
8281
private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR =
@@ -111,7 +110,6 @@ class RSocketResponder implements RSocket {
111110
this.leaseHandler = leaseHandler;
112111
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
113112
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
114-
this.onClose = MonoProcessor.create();
115113

116114
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
117115
// connections
@@ -124,7 +122,6 @@ class RSocketResponder implements RSocket {
124122

125123
this.connection
126124
.onClose()
127-
.or(onClose)
128125
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
129126
}
130127

@@ -257,12 +254,12 @@ public void dispose() {
257254

258255
@Override
259256
public boolean isDisposed() {
260-
return onClose.isDisposed();
257+
return connection.isDisposed();
261258
}
262259

263260
@Override
264261
public Mono<Void> onClose() {
265-
return onClose;
262+
return connection.onClose();
266263
}
267264

268265
private void cleanup(Throwable e) {

rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.junit.runner.Description;
4242
import org.junit.runners.model.Statement;
4343
import org.reactivestreams.Publisher;
44+
import reactor.core.Disposable;
45+
import reactor.core.Disposables;
4446
import reactor.core.publisher.DirectProcessor;
4547
import reactor.core.publisher.Flux;
4648
import reactor.core.publisher.Mono;
@@ -51,6 +53,34 @@ public class RSocketTest {
5153

5254
@Rule public final SocketRule rule = new SocketRule();
5355

56+
@Test
57+
public void rsocketDisposalShouldEndupWithNoErrorsOnClose() {
58+
RSocket requestHandlingRSocket =
59+
new RSocket() {
60+
final Disposable disposable = Disposables.single();
61+
62+
@Override
63+
public void dispose() {
64+
disposable.dispose();
65+
}
66+
67+
@Override
68+
public boolean isDisposed() {
69+
return disposable.isDisposed();
70+
}
71+
};
72+
rule.setRequestAcceptor(requestHandlingRSocket);
73+
rule.crs
74+
.onClose()
75+
.as(StepVerifier::create)
76+
.expectSubscription()
77+
.then(rule.crs::dispose)
78+
.expectComplete()
79+
.verify(Duration.ofMillis(100));
80+
81+
Assertions.assertThat(requestHandlingRSocket.isDisposed()).isTrue();
82+
}
83+
5484
@Test(timeout = 2_000)
5585
public void testRequestReplyNoError() {
5686
StepVerifier.create(rule.crs.requestResponse(DefaultPayload.create("hello")))
@@ -413,6 +443,9 @@ protected void init() {
413443
LocalDuplexConnection clientConnection =
414444
new LocalDuplexConnection("client", allocator, serverProcessor, clientProcessor);
415445

446+
clientConnection.onClose().doFinally(__ -> serverConnection.dispose()).subscribe();
447+
serverConnection.onClose().doFinally(__ -> clientConnection.dispose()).subscribe();
448+
416449
requestAcceptor =
417450
null != requestAcceptor
418451
? requestAcceptor

0 commit comments

Comments
 (0)