40
40
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
41
41
import java .util .function .Consumer ;
42
42
import java .util .function .LongConsumer ;
43
+ import java .util .function .Supplier ;
43
44
import javax .annotation .Nonnull ;
44
45
import javax .annotation .Nullable ;
45
46
import org .reactivestreams .Processor ;
@@ -56,6 +57,7 @@ class RSocketRequester implements RSocket {
56
57
private static final AtomicReferenceFieldUpdater <RSocketRequester , Throwable > TERMINATION_ERROR =
57
58
AtomicReferenceFieldUpdater .newUpdater (
58
59
RSocketRequester .class , Throwable .class , "terminationError" );
60
+ private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException ();
59
61
60
62
private final DuplexConnection connection ;
61
63
private final PayloadDecoder payloadDecoder ;
@@ -91,69 +93,25 @@ class RSocketRequester implements RSocket {
91
93
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
92
94
this .sendProcessor = new UnboundedProcessor <>();
93
95
94
- connection .onClose ().doFinally (signalType -> terminate ()).subscribe (null , errorConsumer );
95
96
connection
96
- .send (sendProcessor )
97
- .doFinally (this ::handleSendProcessorCancel )
98
- .subscribe (null , this ::handleSendProcessorError );
97
+ .onClose ()
98
+ .doFinally (signalType -> tryTerminateOnConnectionClose ())
99
+ .subscribe (null , errorConsumer );
100
+ connection .send (sendProcessor ).subscribe (null , this ::handleSendProcessorError );
99
101
100
102
connection .receive ().subscribe (this ::handleIncomingFrames , errorConsumer );
101
103
102
104
if (keepAliveTickPeriod != 0 && keepAliveHandler != null ) {
103
105
KeepAliveSupport keepAliveSupport =
104
106
new ClientKeepAliveSupport (allocator , keepAliveTickPeriod , keepAliveAckTimeout );
105
107
this .keepAliveFramesAcceptor =
106
- keepAliveHandler .start (keepAliveSupport , sendProcessor ::onNext , this ::terminate );
108
+ keepAliveHandler .start (
109
+ keepAliveSupport , sendProcessor ::onNext , this ::tryTerminateOnKeepAlive );
107
110
} else {
108
111
keepAliveFramesAcceptor = null ;
109
112
}
110
113
}
111
114
112
- private void terminate (KeepAlive keepAlive ) {
113
- String message =
114
- String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeout ().toMillis ());
115
- ConnectionErrorException err = new ConnectionErrorException (message );
116
- setTerminationError (err );
117
- errorConsumer .accept (err );
118
- connection .dispose ();
119
- }
120
-
121
- private void handleSendProcessorError (Throwable t ) {
122
- Throwable terminationError = this .terminationError ;
123
- Throwable err = terminationError != null ? terminationError : t ;
124
- receivers
125
- .values ()
126
- .forEach (
127
- subscriber -> {
128
- try {
129
- subscriber .onError (err );
130
- } catch (Throwable e ) {
131
- errorConsumer .accept (e );
132
- }
133
- });
134
-
135
- senders .values ().forEach (RateLimitableRequestPublisher ::cancel );
136
- }
137
-
138
- private void handleSendProcessorCancel (SignalType t ) {
139
- if (SignalType .ON_ERROR == t ) {
140
- return ;
141
- }
142
-
143
- receivers
144
- .values ()
145
- .forEach (
146
- subscriber -> {
147
- try {
148
- subscriber .onError (new Throwable ("closed connection" ));
149
- } catch (Throwable e ) {
150
- errorConsumer .accept (e );
151
- }
152
- });
153
-
154
- senders .values ().forEach (RateLimitableRequestPublisher ::cancel );
155
- }
156
-
157
115
@ Override
158
116
public Mono <Void > fireAndForget (Payload payload ) {
159
117
return handleFireAndForget (payload );
@@ -263,8 +221,7 @@ public void acceptOnce(@Nonnull Subscription subscription) {
263
221
if (s == SignalType .CANCEL ) {
264
222
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
265
223
}
266
-
267
- receivers .remove (streamId );
224
+ removeStreamReceiver (streamId );
268
225
});
269
226
}
270
227
@@ -318,7 +275,7 @@ public void accept(long n) {
318
275
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
319
276
}
320
277
})
321
- .doFinally (s -> receivers . remove (streamId ));
278
+ .doFinally (s -> removeStreamReceiver (streamId ));
322
279
}
323
280
324
281
private Flux <Payload > handleChannel (Flux <Payload > request ) {
@@ -419,14 +376,7 @@ protected void hookOnError(Throwable t) {
419
376
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
420
377
}
421
378
})
422
- .doFinally (
423
- s -> {
424
- receivers .remove (streamId );
425
- RateLimitableRequestPublisher sender = senders .remove (streamId );
426
- if (sender != null ) {
427
- sender .cancel ();
428
- }
429
- });
379
+ .doFinally (s -> removeStreamReceiverAndSender (streamId ));
430
380
}
431
381
432
382
private Mono <Void > handleMetadataPush (Payload payload ) {
@@ -472,40 +422,6 @@ private boolean contains(int streamId) {
472
422
return receivers .containsKey (streamId );
473
423
}
474
424
475
- private void terminate () {
476
- setTerminationError (new ClosedChannelException ());
477
- leaseHandler .dispose ();
478
- try {
479
- receivers .values ().forEach (this ::cleanUpSubscriber );
480
- senders .values ().forEach (this ::cleanUpLimitableRequestPublisher );
481
- } finally {
482
- senders .clear ();
483
- receivers .clear ();
484
- sendProcessor .dispose ();
485
- }
486
- }
487
-
488
- private void setTerminationError (Throwable error ) {
489
- TERMINATION_ERROR .compareAndSet (this , null , error );
490
- }
491
-
492
- private synchronized void cleanUpLimitableRequestPublisher (
493
- RateLimitableRequestPublisher <?> limitableRequestPublisher ) {
494
- try {
495
- limitableRequestPublisher .cancel ();
496
- } catch (Throwable t ) {
497
- errorConsumer .accept (t );
498
- }
499
- }
500
-
501
- private synchronized void cleanUpSubscriber (Processor subscriber ) {
502
- try {
503
- subscriber .onError (terminationError );
504
- } catch (Throwable t ) {
505
- errorConsumer .accept (t );
506
- }
507
- }
508
-
509
425
private void handleIncomingFrames (ByteBuf frame ) {
510
426
try {
511
427
int streamId = FrameHeaderFlyweight .streamId (frame );
@@ -525,10 +441,7 @@ private void handleIncomingFrames(ByteBuf frame) {
525
441
private void handleStreamZero (FrameType type , ByteBuf frame ) {
526
442
switch (type ) {
527
443
case ERROR :
528
- RuntimeException error = Exceptions .from (frame );
529
- setTerminationError (error );
530
- errorConsumer .accept (error );
531
- connection .dispose ();
444
+ tryTerminateOnZeroError (frame );
532
445
break ;
533
446
case LEASE :
534
447
leaseHandler .receive (frame );
@@ -614,4 +527,86 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu
614
527
// receiving a frame after a given stream has been cancelled/completed,
615
528
// so ignore (cancellation is async so there is a race condition)
616
529
}
530
+
531
+ private void tryTerminateOnKeepAlive (KeepAlive keepAlive ) {
532
+ tryTerminate (
533
+ () ->
534
+ new ConnectionErrorException (
535
+ String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeout ().toMillis ())));
536
+ }
537
+
538
+ private void tryTerminateOnConnectionClose () {
539
+ tryTerminate (() -> CLOSED_CHANNEL_EXCEPTION );
540
+ }
541
+
542
+ private void tryTerminateOnZeroError (ByteBuf errorFrame ) {
543
+ tryTerminate (() -> Exceptions .from (errorFrame ));
544
+ }
545
+
546
+ private void tryTerminate (Supplier <Exception > errorSupplier ) {
547
+ if (terminationError == null ) {
548
+ Exception e = errorSupplier .get ();
549
+ if (TERMINATION_ERROR .compareAndSet (this , null , e )) {
550
+ terminate (e );
551
+ }
552
+ }
553
+ }
554
+
555
+ private void terminate (Exception e ) {
556
+ connection .dispose ();
557
+ leaseHandler .dispose ();
558
+
559
+ synchronized (receivers ) {
560
+ receivers
561
+ .values ()
562
+ .forEach (
563
+ receiver -> {
564
+ try {
565
+ receiver .onError (e );
566
+ } catch (Throwable t ) {
567
+ errorConsumer .accept (t );
568
+ }
569
+ });
570
+ }
571
+ synchronized (senders ) {
572
+ senders
573
+ .values ()
574
+ .forEach (
575
+ sender -> {
576
+ try {
577
+ sender .cancel ();
578
+ } catch (Throwable t ) {
579
+ errorConsumer .accept (t );
580
+ }
581
+ });
582
+ }
583
+ senders .clear ();
584
+ receivers .clear ();
585
+ sendProcessor .dispose ();
586
+ errorConsumer .accept (e );
587
+ }
588
+
589
+ private void removeStreamReceiver (int streamId ) {
590
+ /*on termination senders & receivers are explicitly cleared to avoid removing from map while iterating over one
591
+ of its views*/
592
+ if (terminationError == null ) {
593
+ receivers .remove (streamId );
594
+ }
595
+ }
596
+
597
+ private void removeStreamReceiverAndSender (int streamId ) {
598
+ /*on termination senders & receivers are explicitly cleared to avoid removing from map while iterating over one
599
+ of its views*/
600
+ if (terminationError == null ) {
601
+ receivers .remove (streamId );
602
+ RateLimitableRequestPublisher <?> sender = senders .remove (streamId );
603
+ if (sender != null ) {
604
+ sender .cancel ();
605
+ }
606
+ }
607
+ }
608
+
609
+ private void handleSendProcessorError (Throwable t ) {
610
+ connection .dispose ();
611
+ }
617
612
}
0 commit comments