@@ -412,20 +412,19 @@ protected void hookOnNext(Payload payload) {
412
412
413
413
@ Override
414
414
protected void hookOnError (Throwable throwable ) {
415
- handleError (streamId , throwable );
415
+ if (sendingSubscriptions .remove (streamId , this )) {
416
+ handleError (streamId , throwable );
417
+ }
416
418
}
417
419
418
420
@ Override
419
421
protected void hookOnComplete () {
420
422
if (isEmpty ) {
421
- sendProcessor .onNext (PayloadFrameCodec .encodeComplete (allocator , streamId ));
423
+ if (sendingSubscriptions .remove (streamId , this )) {
424
+ sendProcessor .onNext (PayloadFrameCodec .encodeComplete (allocator , streamId ));
425
+ }
422
426
}
423
427
}
424
-
425
- @ Override
426
- protected void hookFinally (SignalType type ) {
427
- sendingSubscriptions .remove (streamId , this );
428
- }
429
428
};
430
429
431
430
sendingSubscriptions .put (streamId , subscriber );
@@ -491,36 +490,17 @@ protected void hookOnNext(Payload payload) {
491
490
492
491
@ Override
493
492
protected void hookOnComplete () {
494
- sendProcessor .onNext (PayloadFrameCodec .encodeComplete (allocator , streamId ));
493
+ if (sendingSubscriptions .remove (streamId , this )) {
494
+ sendProcessor .onNext (PayloadFrameCodec .encodeComplete (allocator , streamId ));
495
+ }
495
496
}
496
497
497
498
@ Override
498
499
protected void hookOnError (Throwable throwable ) {
499
- handleError (streamId , throwable );
500
- }
501
-
502
- @ Override
503
- protected void hookOnCancel () {
504
- // specifically for requestChannel case so when requester sends Cancel frame so the
505
- // whole chain MUST be terminated
506
- // Note: CancelFrame is redundant from the responder side due to spec
507
- // (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
508
- // Upon receiving a CANCEL, the stream is terminated on the Responder.
509
- // Upon sending a CANCEL, the stream is terminated on the Requester.
510
- if (requestChannel != null ) {
511
- channelProcessors .remove (streamId , requestChannel );
512
- try {
513
- requestChannel .dispose ();
514
- } catch (Exception e ) {
515
- // might be thrown back if stream is cancelled
516
- }
500
+ if (sendingSubscriptions .remove (streamId , this )) {
501
+ handleError (streamId , throwable );
517
502
}
518
503
}
519
-
520
- @ Override
521
- protected void hookFinally (SignalType type ) {
522
- sendingSubscriptions .remove (streamId );
523
- }
524
504
};
525
505
526
506
sendingSubscriptions .put (streamId , subscriber );
@@ -588,7 +568,15 @@ protected void hookOnError(Throwable throwable) {}
588
568
589
569
private void handleCancelFrame (int streamId ) {
590
570
Subscription subscription = sendingSubscriptions .remove (streamId );
591
- channelProcessors .remove (streamId );
571
+ Processor <Payload , Payload > processor = channelProcessors .remove (streamId );
572
+
573
+ if (processor != null ) {
574
+ try {
575
+ processor .onError (new CancellationException ("Disposed" ));
576
+ } catch (Exception e ) {
577
+ // ignore
578
+ }
579
+ }
592
580
593
581
if (subscription != null ) {
594
582
subscription .cancel ();
0 commit comments