40
40
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
41
41
import java .util .function .Consumer ;
42
42
import java .util .function .LongConsumer ;
43
+ import java .util .function .Supplier ;
43
44
import javax .annotation .Nonnull ;
44
45
import javax .annotation .Nullable ;
45
46
import org .reactivestreams .Processor ;
@@ -56,6 +57,11 @@ class RSocketRequester implements RSocket {
56
57
private static final AtomicReferenceFieldUpdater <RSocketRequester , Throwable > TERMINATION_ERROR =
57
58
AtomicReferenceFieldUpdater .newUpdater (
58
59
RSocketRequester .class , Throwable .class , "terminationError" );
60
+ private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException ();
61
+
62
+ static {
63
+ CLOSED_CHANNEL_EXCEPTION .setStackTrace (new StackTraceElement [0 ]);
64
+ }
59
65
60
66
private final DuplexConnection connection ;
61
67
private final PayloadDecoder payloadDecoder ;
@@ -91,69 +97,25 @@ class RSocketRequester implements RSocket {
91
97
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
92
98
this .sendProcessor = new UnboundedProcessor <>();
93
99
94
- connection .onClose ().doFinally (signalType -> terminate ()).subscribe (null , errorConsumer );
95
100
connection
96
- .send (sendProcessor )
97
- .doFinally (this ::handleSendProcessorCancel )
98
- .subscribe (null , this ::handleSendProcessorError );
101
+ .onClose ()
102
+ .doFinally (signalType -> tryTerminateOnConnectionClose ())
103
+ .subscribe (null , errorConsumer );
104
+ connection .send (sendProcessor ).subscribe (null , this ::handleSendProcessorError );
99
105
100
106
connection .receive ().subscribe (this ::handleIncomingFrames , errorConsumer );
101
107
102
108
if (keepAliveTickPeriod != 0 && keepAliveHandler != null ) {
103
109
KeepAliveSupport keepAliveSupport =
104
110
new ClientKeepAliveSupport (allocator , keepAliveTickPeriod , keepAliveAckTimeout );
105
111
this .keepAliveFramesAcceptor =
106
- keepAliveHandler .start (keepAliveSupport , sendProcessor ::onNext , this ::terminate );
112
+ keepAliveHandler .start (
113
+ keepAliveSupport , sendProcessor ::onNext , this ::tryTerminateOnKeepAlive );
107
114
} else {
108
115
keepAliveFramesAcceptor = null ;
109
116
}
110
117
}
111
118
112
- private void terminate (KeepAlive keepAlive ) {
113
- String message =
114
- String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeout ().toMillis ());
115
- ConnectionErrorException err = new ConnectionErrorException (message );
116
- setTerminationError (err );
117
- errorConsumer .accept (err );
118
- connection .dispose ();
119
- }
120
-
121
- private void handleSendProcessorError (Throwable t ) {
122
- Throwable terminationError = this .terminationError ;
123
- Throwable err = terminationError != null ? terminationError : t ;
124
- receivers
125
- .values ()
126
- .forEach (
127
- subscriber -> {
128
- try {
129
- subscriber .onError (err );
130
- } catch (Throwable e ) {
131
- errorConsumer .accept (e );
132
- }
133
- });
134
-
135
- senders .values ().forEach (RateLimitableRequestPublisher ::cancel );
136
- }
137
-
138
- private void handleSendProcessorCancel (SignalType t ) {
139
- if (SignalType .ON_ERROR == t ) {
140
- return ;
141
- }
142
-
143
- receivers
144
- .values ()
145
- .forEach (
146
- subscriber -> {
147
- try {
148
- subscriber .onError (new Throwable ("closed connection" ));
149
- } catch (Throwable e ) {
150
- errorConsumer .accept (e );
151
- }
152
- });
153
-
154
- senders .values ().forEach (RateLimitableRequestPublisher ::cancel );
155
- }
156
-
157
119
@ Override
158
120
public Mono <Void > fireAndForget (Payload payload ) {
159
121
return handleFireAndForget (payload );
@@ -263,8 +225,7 @@ public void acceptOnce(@Nonnull Subscription subscription) {
263
225
if (s == SignalType .CANCEL ) {
264
226
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
265
227
}
266
-
267
- receivers .remove (streamId );
228
+ removeStreamReceiver (streamId );
268
229
});
269
230
}
270
231
@@ -318,7 +279,7 @@ public void accept(long n) {
318
279
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
319
280
}
320
281
})
321
- .doFinally (s -> receivers . remove (streamId ));
282
+ .doFinally (s -> removeStreamReceiver (streamId ));
322
283
}
323
284
324
285
private Flux <Payload > handleChannel (Flux <Payload > request ) {
@@ -419,14 +380,7 @@ protected void hookOnError(Throwable t) {
419
380
sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
420
381
}
421
382
})
422
- .doFinally (
423
- s -> {
424
- receivers .remove (streamId );
425
- RateLimitableRequestPublisher sender = senders .remove (streamId );
426
- if (sender != null ) {
427
- sender .cancel ();
428
- }
429
- });
383
+ .doFinally (s -> removeStreamReceiverAndSender (streamId ));
430
384
}
431
385
432
386
private Mono <Void > handleMetadataPush (Payload payload ) {
@@ -472,40 +426,6 @@ private boolean contains(int streamId) {
472
426
return receivers .containsKey (streamId );
473
427
}
474
428
475
- private void terminate () {
476
- setTerminationError (new ClosedChannelException ());
477
- leaseHandler .dispose ();
478
- try {
479
- receivers .values ().forEach (this ::cleanUpSubscriber );
480
- senders .values ().forEach (this ::cleanUpLimitableRequestPublisher );
481
- } finally {
482
- senders .clear ();
483
- receivers .clear ();
484
- sendProcessor .dispose ();
485
- }
486
- }
487
-
488
- private void setTerminationError (Throwable error ) {
489
- TERMINATION_ERROR .compareAndSet (this , null , error );
490
- }
491
-
492
- private synchronized void cleanUpLimitableRequestPublisher (
493
- RateLimitableRequestPublisher <?> limitableRequestPublisher ) {
494
- try {
495
- limitableRequestPublisher .cancel ();
496
- } catch (Throwable t ) {
497
- errorConsumer .accept (t );
498
- }
499
- }
500
-
501
- private synchronized void cleanUpSubscriber (Processor subscriber ) {
502
- try {
503
- subscriber .onError (terminationError );
504
- } catch (Throwable t ) {
505
- errorConsumer .accept (t );
506
- }
507
- }
508
-
509
429
private void handleIncomingFrames (ByteBuf frame ) {
510
430
try {
511
431
int streamId = FrameHeaderFlyweight .streamId (frame );
@@ -525,10 +445,7 @@ private void handleIncomingFrames(ByteBuf frame) {
525
445
private void handleStreamZero (FrameType type , ByteBuf frame ) {
526
446
switch (type ) {
527
447
case ERROR :
528
- RuntimeException error = Exceptions .from (frame );
529
- setTerminationError (error );
530
- errorConsumer .accept (error );
531
- connection .dispose ();
448
+ tryTerminateOnZeroError (frame );
532
449
break ;
533
450
case LEASE :
534
451
leaseHandler .receive (frame );
@@ -614,4 +531,86 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, ByteBu
614
531
// receiving a frame after a given stream has been cancelled/completed,
615
532
// so ignore (cancellation is async so there is a race condition)
616
533
}
534
+
535
+ private void tryTerminateOnKeepAlive (KeepAlive keepAlive ) {
536
+ tryTerminate (
537
+ () ->
538
+ new ConnectionErrorException (
539
+ String .format ("No keep-alive acks for %d ms" , keepAlive .getTimeout ().toMillis ())));
540
+ }
541
+
542
+ private void tryTerminateOnConnectionClose () {
543
+ tryTerminate (() -> CLOSED_CHANNEL_EXCEPTION );
544
+ }
545
+
546
+ private void tryTerminateOnZeroError (ByteBuf errorFrame ) {
547
+ tryTerminate (() -> Exceptions .from (errorFrame ));
548
+ }
549
+
550
+ private void tryTerminate (Supplier <Exception > errorSupplier ) {
551
+ if (terminationError == null ) {
552
+ Exception e = errorSupplier .get ();
553
+ if (TERMINATION_ERROR .compareAndSet (this , null , e )) {
554
+ terminate (e );
555
+ }
556
+ }
557
+ }
558
+
559
+ private void terminate (Exception e ) {
560
+ connection .dispose ();
561
+ leaseHandler .dispose ();
562
+
563
+ synchronized (receivers ) {
564
+ receivers
565
+ .values ()
566
+ .forEach (
567
+ receiver -> {
568
+ try {
569
+ receiver .onError (e );
570
+ } catch (Throwable t ) {
571
+ errorConsumer .accept (t );
572
+ }
573
+ });
574
+ }
575
+ synchronized (senders ) {
576
+ senders
577
+ .values ()
578
+ .forEach (
579
+ sender -> {
580
+ try {
581
+ sender .cancel ();
582
+ } catch (Throwable t ) {
583
+ errorConsumer .accept (t );
584
+ }
585
+ });
586
+ }
587
+ senders .clear ();
588
+ receivers .clear ();
589
+ sendProcessor .dispose ();
590
+ errorConsumer .accept (e );
591
+ }
592
+
593
+ private void removeStreamReceiver (int streamId ) {
594
+ /*on termination receivers are explicitly cleared to avoid removing from map while iterating over one
595
+ of its views*/
596
+ if (terminationError == null ) {
597
+ receivers .remove (streamId );
598
+ }
599
+ }
600
+
601
+ private void removeStreamReceiverAndSender (int streamId ) {
602
+ /*on termination senders & receivers are explicitly cleared to avoid removing from map while iterating over one
603
+ of its views*/
604
+ if (terminationError == null ) {
605
+ receivers .remove (streamId );
606
+ RateLimitableRequestPublisher <?> sender = senders .remove (streamId );
607
+ if (sender != null ) {
608
+ sender .cancel ();
609
+ }
610
+ }
611
+ }
612
+
613
+ private void handleSendProcessorError (Throwable t ) {
614
+ connection .dispose ();
615
+ }
617
616
}
0 commit comments