21
21
import io .rsocket .framing .FrameType ;
22
22
import io .rsocket .internal .LimitableRequestPublisher ;
23
23
import io .rsocket .internal .UnboundedProcessor ;
24
+
25
+ import java .nio .channels .ClosedChannelException ;
24
26
import java .time .Duration ;
25
27
import java .util .concurrent .atomic .AtomicBoolean ;
28
+ import java .util .concurrent .atomic .AtomicReference ;
26
29
import java .util .function .Consumer ;
27
30
import java .util .function .Function ;
28
31
import org .jctools .maps .NonBlockingHashMapLong ;
@@ -72,7 +75,7 @@ class RSocketClient implements RSocket {
72
75
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
73
76
this .sendProcessor = new UnboundedProcessor <>();
74
77
75
- connection .onClose ().doFinally (signalType -> cleanup ()).subscribe (null , errorConsumer );
78
+ connection .onClose ().doFinally (signalType -> terminate ()).subscribe (null , errorConsumer );
76
79
77
80
connection
78
81
.send (sendProcessor )
@@ -92,7 +95,9 @@ class RSocketClient implements RSocket {
92
95
keepAlive -> {
93
96
String message =
94
97
String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeoutMillis ());
95
- errorConsumer .accept (new ConnectionErrorException (message ));
98
+ ConnectionErrorException err = new ConnectionErrorException (message );
99
+ lifecycle .terminate (err );
100
+ errorConsumer .accept (err );
96
101
connection .dispose ();
97
102
});
98
103
keepAliveHandler .send ().subscribe (sendProcessor ::onNext );
@@ -157,12 +162,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
157
162
158
163
@ Override
159
164
public Mono <Void > metadataPush (Payload payload ) {
160
- return Mono .fromRunnable (
161
- () -> {
162
- final Frame requestFrame = Frame .Request .from (0 , FrameType .METADATA_PUSH , payload , 1 );
163
- payload .release ();
164
- sendProcessor .onNext (requestFrame );
165
- });
165
+ return handleMetadataPush (payload );
166
166
}
167
167
168
168
@ Override
@@ -187,7 +187,7 @@ public Mono<Void> onClose() {
187
187
188
188
private Mono <Void > handleFireAndForget (Payload payload ) {
189
189
return lifecycle
190
- .started ()
190
+ .active ()
191
191
.then (
192
192
Mono .fromRunnable (
193
193
() -> {
@@ -201,7 +201,7 @@ private Mono<Void> handleFireAndForget(Payload payload) {
201
201
202
202
private Flux <Payload > handleRequestStream (final Payload payload ) {
203
203
return lifecycle
204
- .started ()
204
+ .active ()
205
205
.thenMany (
206
206
Flux .defer (
207
207
() -> {
@@ -247,7 +247,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
247
247
248
248
private Mono <Payload > handleRequestResponse (final Payload payload ) {
249
249
return lifecycle
250
- .started ()
250
+ .active ()
251
251
.then (
252
252
Mono .defer (
253
253
() -> {
@@ -274,7 +274,7 @@ private Mono<Payload> handleRequestResponse(final Payload payload) {
274
274
275
275
private Flux <Payload > handleChannel (Flux <Payload > request ) {
276
276
return lifecycle
277
- .started ()
277
+ .active ()
278
278
.thenMany (
279
279
Flux .defer (
280
280
() -> {
@@ -365,11 +365,25 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
365
365
}));
366
366
}
367
367
368
+ private Mono <Void > handleMetadataPush (Payload payload ) {
369
+ return lifecycle
370
+ .active ()
371
+ .then (Mono .fromRunnable (
372
+ () -> {
373
+ final Frame requestFrame = Frame .Request .from (0 , FrameType .METADATA_PUSH , payload , 1 );
374
+ payload .release ();
375
+ sendProcessor .onNext (requestFrame );
376
+ }));
377
+ }
378
+
368
379
private boolean contains (int streamId ) {
369
380
return receivers .containsKey (streamId );
370
381
}
371
382
372
- protected void cleanup () {
383
+ protected void terminate () {
384
+
385
+ lifecycle .terminate (new ClosedChannelException ());
386
+
373
387
if (keepAliveHandler != null ) {
374
388
keepAliveHandler .dispose ();
375
389
}
@@ -397,13 +411,8 @@ private synchronized void cleanUpLimitableRequestPublisher(
397
411
}
398
412
399
413
private synchronized void cleanUpSubscriber (UnicastProcessor <?> subscriber ) {
400
- Throwable err = lifecycle .terminationError ();
401
414
try {
402
- if (err != null ) {
403
- subscriber .onError (err );
404
- } else {
405
- subscriber .cancel ();
406
- }
415
+ subscriber .onError (lifecycle .terminationError ());
407
416
} catch (Throwable t ) {
408
417
errorConsumer .accept (t );
409
418
}
@@ -519,12 +528,12 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
519
528
520
529
private static class Lifecycle {
521
530
522
- private volatile Throwable terminationError ;
531
+ private final AtomicReference < Throwable > terminationError = new AtomicReference <>() ;
523
532
524
- public Mono <Void > started () {
533
+ public Mono <Void > active () {
525
534
return Mono .create (
526
535
sink -> {
527
- Throwable err = terminationError ;
536
+ Throwable err = terminationError () ;
528
537
if (err == null ) {
529
538
sink .success ();
530
539
} else {
@@ -534,11 +543,11 @@ public Mono<Void> started() {
534
543
}
535
544
536
545
public void terminate (Throwable err ) {
537
- this .terminationError = err ;
546
+ this .terminationError . compareAndSet ( null , err ) ;
538
547
}
539
548
540
549
public Throwable terminationError () {
541
- return terminationError ;
550
+ return terminationError . get () ;
542
551
}
543
552
}
544
553
}
0 commit comments