Skip to content

Commit 480b501

Browse files
authored
fixes requestChannel and ensure support of half-closed state (#794)
The following is the cases which MUST be supported OUTER PUBLISHER COMPLETED - INNER PUBLISHER CAN SEND OUTER PUBLISHER CANCELLED - INNER PUBLISHER CAN SEND INNER PUBLISHER COMPLETED - OUTER PUBLISHER CAN SEND INNER PUBLISHER CANCELLED - the WHOLE CHAIN IS TERMINATED
1 parent 5ee6c38 commit 480b501

File tree

3 files changed

+302
-38
lines changed

3 files changed

+302
-38
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 48 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -560,46 +560,58 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
560560

561561
private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
562562
Subscriber<Payload> receiver = receivers.get(streamId);
563-
if (receiver == null) {
564-
handleMissingResponseProcessor(streamId, type, frame);
565-
} else {
566-
switch (type) {
567-
case ERROR:
568-
receiver.onError(Exceptions.from(streamId, frame));
569-
receivers.remove(streamId);
570-
break;
571-
case NEXT_COMPLETE:
572-
receiver.onNext(payloadDecoder.apply(frame));
573-
receiver.onComplete();
574-
break;
575-
case CANCEL:
576-
{
577-
Subscription sender = senders.remove(streamId);
578-
if (sender != null) {
579-
sender.cancel();
580-
}
581-
break;
563+
switch (type) {
564+
case NEXT:
565+
if (receiver == null) {
566+
handleMissingResponseProcessor(streamId, type, frame);
567+
return;
568+
}
569+
receiver.onNext(payloadDecoder.apply(frame));
570+
break;
571+
case NEXT_COMPLETE:
572+
if (receiver == null) {
573+
handleMissingResponseProcessor(streamId, type, frame);
574+
return;
575+
}
576+
receiver.onNext(payloadDecoder.apply(frame));
577+
receiver.onComplete();
578+
break;
579+
case COMPLETE:
580+
if (receiver == null) {
581+
handleMissingResponseProcessor(streamId, type, frame);
582+
return;
583+
}
584+
receiver.onComplete();
585+
receivers.remove(streamId);
586+
break;
587+
case ERROR:
588+
if (receiver == null) {
589+
handleMissingResponseProcessor(streamId, type, frame);
590+
return;
591+
}
592+
receiver.onError(Exceptions.from(streamId, frame));
593+
receivers.remove(streamId);
594+
break;
595+
case CANCEL:
596+
{
597+
Subscription sender = senders.remove(streamId);
598+
if (sender != null) {
599+
sender.cancel();
582600
}
583-
case NEXT:
584-
receiver.onNext(payloadDecoder.apply(frame));
585601
break;
586-
case REQUEST_N:
587-
{
588-
Subscription sender = senders.get(streamId);
589-
if (sender != null) {
590-
int n = RequestNFrameFlyweight.requestN(frame);
591-
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
592-
}
593-
break;
602+
}
603+
case REQUEST_N:
604+
{
605+
Subscription sender = senders.get(streamId);
606+
if (sender != null) {
607+
int n = RequestNFrameFlyweight.requestN(frame);
608+
sender.request(n >= Integer.MAX_VALUE ? Long.MAX_VALUE : n);
594609
}
595-
case COMPLETE:
596-
receiver.onComplete();
597-
receivers.remove(streamId);
598610
break;
599-
default:
600-
throw new IllegalStateException(
601-
"Client received supported frame on stream " + streamId + ": " + frame.toString());
602-
}
611+
}
612+
default:
613+
throw new IllegalStateException(
614+
"Client received supported frame on stream " + streamId + ": " + frame.toString());
603615
}
604616
}
605617

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,24 @@ protected void hookOnError(Throwable throwable) {
492492
handleError(streamId, throwable);
493493
}
494494

495+
@Override
496+
protected void hookOnCancel() {
497+
// specifically for requestChannel case so when requester sends Cancel frame so the
498+
// whole chain MUST be terminated
499+
// Note: CancelFrame is redundant from the responder side due to spec
500+
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
501+
// Upon receiving a CANCEL, the stream is terminated on the Responder.
502+
// Upon sending a CANCEL, the stream is terminated on the Requester.
503+
if (requestChannel != null) {
504+
channelProcessors.remove(streamId, requestChannel);
505+
try {
506+
requestChannel.dispose();
507+
} catch (Exception e) {
508+
// might be thrown back if stream is cancelled
509+
}
510+
}
511+
}
512+
495513
@Override
496514
protected void hookFinally(SignalType type) {
497515
sendingSubscriptions.remove(streamId);
@@ -568,6 +586,7 @@ protected void hookOnError(Throwable throwable) {
568586

569587
private void handleCancelFrame(int streamId) {
570588
Subscription subscription = sendingSubscriptions.remove(streamId);
589+
channelProcessors.remove(streamId);
571590

572591
if (subscription != null) {
573592
subscription.cancel();

0 commit comments

Comments
 (0)