50
50
import io .rsocket .keepalive .KeepAliveSupport ;
51
51
import io .rsocket .lease .RequesterLeaseHandler ;
52
52
import java .nio .channels .ClosedChannelException ;
53
- import java .util .concurrent .CancellationException ;
54
53
import java .util .concurrent .atomic .AtomicBoolean ;
55
54
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
56
55
import java .util .function .Consumer ;
64
63
import reactor .core .publisher .BaseSubscriber ;
65
64
import reactor .core .publisher .Flux ;
66
65
import reactor .core .publisher .Mono ;
66
+ import reactor .core .publisher .MonoProcessor ;
67
67
import reactor .core .publisher .Operators ;
68
68
import reactor .core .publisher .SignalType ;
69
69
import reactor .core .publisher .UnicastProcessor ;
@@ -109,6 +109,7 @@ class RSocketRequester implements RSocket {
109
109
private final RequesterLeaseHandler leaseHandler ;
110
110
private final ByteBufAllocator allocator ;
111
111
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor ;
112
+ private final MonoProcessor <Void > onClose ;
112
113
private final Scheduler serialScheduler ;
113
114
114
115
RSocketRequester (
@@ -129,6 +130,7 @@ class RSocketRequester implements RSocket {
129
130
this .leaseHandler = leaseHandler ;
130
131
this .senders = new SynchronizedIntObjectHashMap <>();
131
132
this .receivers = new SynchronizedIntObjectHashMap <>();
133
+ this .onClose = MonoProcessor .create ();
132
134
this .serialScheduler = serialScheduler ;
133
135
134
136
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
@@ -182,17 +184,17 @@ public double availability() {
182
184
183
185
@ Override
184
186
public void dispose () {
185
- tryTerminate (() -> new CancellationException ( "Disposed" ) );
187
+ tryShutdown ( );
186
188
}
187
189
188
190
@ Override
189
191
public boolean isDisposed () {
190
- return connection .isDisposed ();
192
+ return onClose .isDisposed ();
191
193
}
192
194
193
195
@ Override
194
196
public Mono <Void > onClose () {
195
- return connection . onClose () ;
197
+ return onClose ;
196
198
}
197
199
198
200
private Mono <Void > handleFireAndForget (Payload payload ) {
@@ -759,6 +761,11 @@ private void terminate(Throwable e) {
759
761
senders .clear ();
760
762
receivers .clear ();
761
763
sendProcessor .dispose ();
764
+ if (e == CLOSED_CHANNEL_EXCEPTION ) {
765
+ onClose .onComplete ();
766
+ } else {
767
+ onClose .onError (e );
768
+ }
762
769
}
763
770
764
771
private void handleSendProcessorError (Throwable t ) {
0 commit comments