36
36
import io .rsocket .lease .ResponderLeaseHandler ;
37
37
import java .util .function .Consumer ;
38
38
import java .util .function .LongConsumer ;
39
- import javax .annotation .Nullable ;
40
39
import org .reactivestreams .Processor ;
41
40
import org .reactivestreams .Publisher ;
42
41
import org .reactivestreams .Subscriber ;
@@ -303,7 +302,7 @@ private void handleFrame(ByteBuf frame) {
303
302
case REQUEST_STREAM :
304
303
int streamInitialRequestN = RequestStreamFrameFlyweight .initialRequestN (frame );
305
304
Payload streamPayload = payloadDecoder .apply (frame );
306
- handleStream (streamId , requestStream (streamPayload ), streamInitialRequestN , null );
305
+ handleStream (streamId , requestStream (streamPayload ), streamInitialRequestN );
307
306
break ;
308
307
case REQUEST_CHANNEL :
309
308
int channelInitialRequestN = RequestChannelFrameFlyweight .initialRequestN (frame );
@@ -434,11 +433,7 @@ protected void hookFinally(SignalType type) {
434
433
response .doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER ).subscribe (subscriber );
435
434
}
436
435
437
- private void handleStream (
438
- int streamId ,
439
- Flux <Payload > response ,
440
- int initialRequestN ,
441
- @ Nullable UnicastProcessor <Payload > requestChannel ) {
436
+ private void handleStream (int streamId , Flux <Payload > response , int initialRequestN ) {
442
437
final BaseSubscriber <Payload > subscriber =
443
438
new BaseSubscriber <Payload >() {
444
439
@@ -451,9 +446,6 @@ protected void hookOnSubscribe(Subscription s) {
451
446
protected void hookOnNext (Payload payload ) {
452
447
if (!PayloadValidationUtils .isValid (mtu , payload )) {
453
448
payload .release ();
454
- if (requestChannel != null ) {
455
- channelProcessors .remove (streamId , requestChannel );
456
- }
457
449
cancel ();
458
450
final IllegalArgumentException t =
459
451
new IllegalArgumentException (INVALID_PAYLOAD_ERROR_MESSAGE );
@@ -503,6 +495,9 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
503
495
504
496
Flux <Payload > payloads =
505
497
frames
498
+ .doOnCancel (
499
+ () -> sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId )))
500
+ .doOnError (t -> handleError (streamId , t ))
506
501
.doOnRequest (
507
502
new LongConsumer () {
508
503
boolean first = true ;
@@ -516,17 +511,7 @@ public void accept(long l) {
516
511
} else {
517
512
n = l ;
518
513
}
519
- if (n > 0 ) {
520
- sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
521
- }
522
- }
523
- })
524
- .doFinally (
525
- signalType -> {
526
- if (channelProcessors .remove (streamId , frames )) {
527
- if (signalType == SignalType .CANCEL ) {
528
- sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
529
- }
514
+ sendProcessor .onNext (RequestNFrameFlyweight .encode (allocator , streamId , n ));
530
515
}
531
516
})
532
517
.doOnDiscard (ReferenceCounted .class , DROPPED_ELEMENTS_CONSUMER );
@@ -537,9 +522,9 @@ public void accept(long l) {
537
522
frames .onNext (payload );
538
523
539
524
if (responderRSocket != null ) {
540
- handleStream (streamId , requestChannel (payload , payloads ), initialRequestN , frames );
525
+ handleStream (streamId , requestChannel (payload , payloads ), initialRequestN );
541
526
} else {
542
- handleStream (streamId , requestChannel (payloads ), initialRequestN , frames );
527
+ handleStream (streamId , requestChannel (payloads ), initialRequestN );
543
528
}
544
529
}
545
530
0 commit comments