Skip to content

Commit 9181ebd

Browse files
authored
provides error handling in case Bubbling / NIC exceptions (#863)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1a4cf5f commit 9181ebd

File tree

4 files changed

+132
-51
lines changed

4 files changed

+132
-51
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,22 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
691691
handleMissingResponseProcessor(streamId, type, frame);
692692
return;
693693
}
694-
receiver.onError(Exceptions.from(streamId, frame));
694+
695+
// FIXME: when https://github.com/reactor/reactor-core/issues/2176 is resolved
696+
// This is workaround to handle specific Reactor related case when
697+
// onError call may not return normally
698+
try {
699+
receiver.onError(Exceptions.from(streamId, frame));
700+
} catch (RuntimeException e) {
701+
if (reactor.core.Exceptions.isBubbling(e)
702+
|| reactor.core.Exceptions.isErrorCallbackNotImplemented(e)) {
703+
if (LOGGER.isDebugEnabled()) {
704+
Throwable unwrapped = reactor.core.Exceptions.unwrap(e);
705+
LOGGER.debug("Unhandled dropped exception", unwrapped);
706+
}
707+
}
708+
}
709+
695710
receivers.remove(streamId);
696711
break;
697712
case CANCEL:

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,20 @@ private void handleFrame(ByteBuf frame) {
338338
case ERROR:
339339
receiver = channelProcessors.get(streamId);
340340
if (receiver != null) {
341-
receiver.onError(io.rsocket.exceptions.Exceptions.from(streamId, frame));
341+
// FIXME: when https://github.com/reactor/reactor-core/issues/2176 is resolved
342+
// This is workaround to handle specific Reactor related case when
343+
// onError call may not return normally
344+
try {
345+
receiver.onError(io.rsocket.exceptions.Exceptions.from(streamId, frame));
346+
} catch (RuntimeException e) {
347+
if (reactor.core.Exceptions.isBubbling(e)
348+
|| reactor.core.Exceptions.isErrorCallbackNotImplemented(e)) {
349+
if (LOGGER.isDebugEnabled()) {
350+
Throwable unwrapped = reactor.core.Exceptions.unwrap(e);
351+
LOGGER.debug("Unhandled dropped exception", unwrapped);
352+
}
353+
}
354+
}
342355
}
343356
break;
344357
case NEXT_COMPLETE:
@@ -448,43 +461,38 @@ protected void hookOnNext(Payload payload) {
448461
try {
449462
if (!PayloadValidationUtils.isValid(mtu, payload)) {
450463
payload.release();
451-
// specifically for requestChannel case so when Payload is invalid we will not be
452-
// sending CancelFrame and ErrorFrame
453-
// Note: CancelFrame is redundant and due to spec
454-
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
455-
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
456-
// is
457-
// terminated on both Requester and Responder.
458-
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
459-
// terminated on both the Requester and Responder.
460-
if (requestChannel != null) {
461-
channelProcessors.remove(streamId, requestChannel);
462-
}
463-
cancel();
464464
final IllegalArgumentException t =
465465
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
466-
handleError(streamId, t);
466+
467+
cancelStream(t);
467468
return;
468469
}
469470

470471
ByteBuf byteBuf =
471472
PayloadFrameCodec.encodeNextReleasingPayload(allocator, streamId, payload);
472473
sendProcessor.onNext(byteBuf);
473474
} catch (Throwable e) {
474-
// specifically for requestChannel case so when Payload is invalid we will not be
475-
// sending CancelFrame and ErrorFrame
476-
// Note: CancelFrame is redundant and due to spec
477-
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
478-
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
479-
// terminated on both Requester and Responder.
480-
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
481-
// terminated on both the Requester and Responder.
482-
if (requestChannel != null) {
483-
channelProcessors.remove(streamId, requestChannel);
484-
}
485-
cancel();
486-
handleError(streamId, e);
475+
cancelStream(e);
476+
}
477+
}
478+
479+
private void cancelStream(Throwable t) {
480+
// Cancel the output stream and send an ERROR frame but do not dispose the
481+
// requestChannel (i.e. close the connection) since the spec allows to leave
482+
// the channel in half-closed state.
483+
// specifically for requestChannel case so when Payload is invalid we will not be
484+
// sending CancelFrame and ErrorFrame
485+
// Note: CancelFrame is redundant and due to spec
486+
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
487+
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
488+
// is terminated on both Requester and Responder.
489+
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
490+
// terminated on both the Requester and Responder.
491+
if (requestChannel != null) {
492+
channelProcessors.remove(streamId, requestChannel);
487493
}
494+
cancel();
495+
handleError(streamId, t);
488496
}
489497

490498
@Override
@@ -502,8 +510,7 @@ protected void hookOnError(Throwable throwable) {
502510
// Note: CancelFrame is redundant and due to spec
503511
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
504512
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream
505-
// is
506-
// terminated on both Requester and Responder.
513+
// is terminated on both Requester and Responder.
507514
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
508515
// terminated on both the Requester and Responder.
509516
if (requestChannel != null && !requestChannel.isDisposed()) {

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,28 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
10811081
}
10821082
}
10831083

1084+
@Test
1085+
// see https://github.com/rsocket/rsocket-java/issues/858
1086+
public void testWorkaround858() {
1087+
ByteBuf buffer = rule.alloc().buffer();
1088+
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
1089+
1090+
rule.socket.requestResponse(ByteBufPayload.create(buffer)).subscribe();
1091+
1092+
rule.connection.addToReceivedBuffer(
1093+
ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));
1094+
1095+
Assertions.assertThat(rule.connection.getSent())
1096+
.hasSize(1)
1097+
.first()
1098+
.matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_RESPONSE)
1099+
.matches(ByteBuf::release);
1100+
1101+
Assertions.assertThat(rule.socket.isDisposed()).isFalse();
1102+
1103+
rule.assertHasNoLeaks();
1104+
}
1105+
10841106
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
10851107
@Override
10861108
protected RSocketRequester newRSocket() {

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -429,23 +429,23 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
429429

430430
ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, 1, Integer.MAX_VALUE);
431431

432-
ByteBuf m1 = allocator.buffer();
433-
m1.writeCharSequence("m1", CharsetUtil.UTF_8);
434-
ByteBuf d1 = allocator.buffer();
435-
d1.writeCharSequence("d1", CharsetUtil.UTF_8);
436-
Payload np1 = ByteBufPayload.create(d1, m1);
437-
438-
ByteBuf m2 = allocator.buffer();
439-
m2.writeCharSequence("m2", CharsetUtil.UTF_8);
440-
ByteBuf d2 = allocator.buffer();
441-
d2.writeCharSequence("d2", CharsetUtil.UTF_8);
442-
Payload np2 = ByteBufPayload.create(d2, m2);
443-
444-
ByteBuf m3 = allocator.buffer();
445-
m3.writeCharSequence("m3", CharsetUtil.UTF_8);
446-
ByteBuf d3 = allocator.buffer();
447-
d3.writeCharSequence("d3", CharsetUtil.UTF_8);
448-
Payload np3 = ByteBufPayload.create(d3, m3);
432+
ByteBuf m1 = allocator.buffer();
433+
m1.writeCharSequence("m1", CharsetUtil.UTF_8);
434+
ByteBuf d1 = allocator.buffer();
435+
d1.writeCharSequence("d1", CharsetUtil.UTF_8);
436+
Payload np1 = ByteBufPayload.create(d1, m1);
437+
438+
ByteBuf m2 = allocator.buffer();
439+
m2.writeCharSequence("m2", CharsetUtil.UTF_8);
440+
ByteBuf d2 = allocator.buffer();
441+
d2.writeCharSequence("d2", CharsetUtil.UTF_8);
442+
Payload np2 = ByteBufPayload.create(d2, m2);
443+
444+
ByteBuf m3 = allocator.buffer();
445+
m3.writeCharSequence("m3", CharsetUtil.UTF_8);
446+
ByteBuf d3 = allocator.buffer();
447+
d3.writeCharSequence("d3", CharsetUtil.UTF_8);
448+
Payload np3 = ByteBufPayload.create(d3, m3);
449449

450450
FluxSink<Payload> sink = sinks[0];
451451
RaceTestUtils.race(
@@ -464,9 +464,10 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
464464

465465
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
466466

467-
assertSubscriber.assertTerminated()
468-
.assertError(CancellationException.class)
469-
.assertErrorMessage("Disposed");
467+
assertSubscriber
468+
.assertTerminated()
469+
.assertError(CancellationException.class)
470+
.assertErrorMessage("Disposed");
470471
Assertions.assertThat(assertSubscriber.values()).allMatch(ReferenceCounted::release);
471472
rule.assertHasNoLeaks();
472473
}
@@ -772,6 +773,42 @@ private static Stream<FrameType> refCntCases() {
772773
return Stream.of(REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL);
773774
}
774775

776+
@Test
777+
// see https://github.com/rsocket/rsocket-java/issues/858
778+
public void testWorkaround858() {
779+
ByteBuf buffer = rule.alloc().buffer();
780+
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
781+
782+
TestPublisher<Payload> testPublisher = TestPublisher.create();
783+
784+
rule.setAcceptingSocket(
785+
new RSocket() {
786+
@Override
787+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
788+
Flux.from(payloads).doOnNext(ReferenceCounted::release).subscribe();
789+
790+
return testPublisher.flux();
791+
}
792+
});
793+
794+
rule.connection.addToReceivedBuffer(
795+
RequestChannelFrameCodec.encodeReleasingPayload(
796+
rule.alloc(), 1, false, 1, ByteBufPayload.create(buffer)));
797+
rule.connection.addToReceivedBuffer(
798+
ErrorFrameCodec.encode(rule.alloc(), 1, new RuntimeException("test")));
799+
800+
Assertions.assertThat(rule.connection.getSent())
801+
.hasSize(1)
802+
.first()
803+
.matches(bb -> FrameHeaderCodec.frameType(bb) == REQUEST_N)
804+
.matches(ReferenceCounted::release);
805+
806+
Assertions.assertThat(rule.socket.isDisposed()).isFalse();
807+
testPublisher.assertWasCancelled();
808+
809+
rule.assertHasNoLeaks();
810+
}
811+
775812
public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder> {
776813

777814
private RSocket acceptingSocket;

0 commit comments

Comments
 (0)