64
64
import reactor .core .publisher .BaseSubscriber ;
65
65
import reactor .core .publisher .Flux ;
66
66
import reactor .core .publisher .Mono ;
67
+ import reactor .core .publisher .MonoProcessor ;
67
68
import reactor .core .publisher .Operators ;
68
69
import reactor .core .publisher .SignalType ;
69
70
import reactor .core .publisher .UnicastProcessor ;
@@ -109,6 +110,7 @@ class RSocketRequester implements RSocket {
109
110
private final RequesterLeaseHandler leaseHandler ;
110
111
private final ByteBufAllocator allocator ;
111
112
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor ;
113
+ private final MonoProcessor <Void > onClose ;
112
114
private final Scheduler serialScheduler ;
113
115
114
116
RSocketRequester (
@@ -129,6 +131,7 @@ class RSocketRequester implements RSocket {
129
131
this .leaseHandler = leaseHandler ;
130
132
this .senders = new SynchronizedIntObjectHashMap <>();
131
133
this .receivers = new SynchronizedIntObjectHashMap <>();
134
+ this .onClose = MonoProcessor .create ();
132
135
this .serialScheduler = serialScheduler ;
133
136
134
137
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
@@ -182,17 +185,17 @@ public double availability() {
182
185
183
186
@ Override
184
187
public void dispose () {
185
- tryTerminate (() -> new CancellationException ( "Disposed" ) );
188
+ tryShutdown ( );
186
189
}
187
190
188
191
@ Override
189
192
public boolean isDisposed () {
190
- return connection .isDisposed ();
193
+ return onClose .isDisposed ();
191
194
}
192
195
193
196
@ Override
194
197
public Mono <Void > onClose () {
195
- return connection . onClose () ;
198
+ return onClose ;
196
199
}
197
200
198
201
private Mono <Void > handleFireAndForget (Payload payload ) {
@@ -759,6 +762,11 @@ private void terminate(Throwable e) {
759
762
senders .clear ();
760
763
receivers .clear ();
761
764
sendProcessor .dispose ();
765
+ if (e == CLOSED_CHANNEL_EXCEPTION ) {
766
+ onClose .onComplete ();
767
+ } else {
768
+ onClose .onError (e );
769
+ }
762
770
}
763
771
764
772
private void handleSendProcessorError (Throwable t ) {
0 commit comments