Skip to content

fixes error handling to match specification #727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
} else {
switch (type) {
case ERROR:
receiver.onError(Exceptions.from(frame));
receiver.onError(Exceptions.from(streamId, frame));
receivers.remove(streamId);
break;
case NEXT_COMPLETE:
Expand Down Expand Up @@ -544,7 +544,7 @@ private void tryTerminateOnConnectionClose() {
}

private void tryTerminateOnZeroError(ByteBuf errorFrame) {
tryTerminate(() -> Exceptions.from(errorFrame));
tryTerminate(() -> Exceptions.from(0, errorFrame));
}

private void tryTerminate(Supplier<Exception> errorSupplier) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.rsocket.exceptions;

import io.rsocket.frame.ErrorType;

public class CustomRSocketException extends RSocketException {
private static final long serialVersionUID = 7873267740343446585L;

private final int errorCode;

/**
* Constructs a new exception with the specified message.
*
* @param errorCode customizable error code. Should be in range [0x00000301-0xFFFFFFFE]
* @param message the message
* @throws NullPointerException if {@code message} is {@code null}
* @throws IllegalArgumentException if {@code errorCode} is out of allowed range
*/
public CustomRSocketException(int errorCode, String message) {
super(message);
if (errorCode > ErrorType.MAX_USER_ALLOWED_ERROR_CODE
&& errorCode < ErrorType.MIN_USER_ALLOWED_ERROR_CODE) {
Comment on lines +20 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks legit, sorry.

    if (errorCode > -2
        && errorCode < 769) {

throw new IllegalArgumentException(
"Allowed errorCode value should be in range [0x00000301-0xFFFFFFFE]");
}
this.errorCode = errorCode;
}

/**
* Constructs a new exception with the specified message and cause.
*
* @param errorCode customizable error code. Should be in range [0x00000301-0xFFFFFFFE]
* @param message the message
* @param cause the cause of this exception
* @throws NullPointerException if {@code message} or {@code cause} is {@code null}
* @throws IllegalArgumentException if {@code errorCode} is out of allowed range
*/
public CustomRSocketException(int errorCode, String message, Throwable cause) {
super(message, cause);
if (errorCode > ErrorType.MAX_USER_ALLOWED_ERROR_CODE
&& errorCode < ErrorType.MIN_USER_ALLOWED_ERROR_CODE) {
throw new IllegalArgumentException(
"Allowed errorCode value should be in range [0x00000301-0xFFFFFFFE]");
}
this.errorCode = errorCode;
}

@Override
public int errorCode() {
return errorCode;
}
}
64 changes: 39 additions & 25 deletions rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,36 +34,50 @@ private Exceptions() {}
* @return a {@link RSocketException} that matches the error code in the Frame
* @throws NullPointerException if {@code frame} is {@code null}
*/
public static RuntimeException from(ByteBuf frame) {
public static RuntimeException from(int streamId, ByteBuf frame) {
Objects.requireNonNull(frame, "frame must not be null");

int errorCode = ErrorFrameFlyweight.errorCode(frame);
String message = ErrorFrameFlyweight.dataUtf8(frame);

switch (errorCode) {
case APPLICATION_ERROR:
return new ApplicationErrorException(message);
case CANCELED:
return new CanceledException(message);
case CONNECTION_CLOSE:
return new ConnectionCloseException(message);
case CONNECTION_ERROR:
return new ConnectionErrorException(message);
case INVALID:
return new InvalidException(message);
case INVALID_SETUP:
return new InvalidSetupException(message);
case REJECTED:
return new RejectedException(message);
case REJECTED_RESUME:
return new RejectedResumeException(message);
case REJECTED_SETUP:
return new RejectedSetupException(message);
case UNSUPPORTED_SETUP:
return new UnsupportedSetupException(message);
default:
return new IllegalArgumentException(
String.format("Invalid Error frame: %d '%s'", errorCode, message));
if (streamId == 0) {
switch (errorCode) {
case INVALID_SETUP:
return new InvalidSetupException(message);
case UNSUPPORTED_SETUP:
return new UnsupportedSetupException(message);
case REJECTED_SETUP:
return new RejectedSetupException(message);
case REJECTED_RESUME:
return new RejectedResumeException(message);
case CONNECTION_ERROR:
return new ConnectionErrorException(message);
case CONNECTION_CLOSE:
return new ConnectionCloseException(message);
default:
return new IllegalArgumentException(
String.format("Invalid Error frame in Stream ID 0: 0x%08X '%s'", errorCode, message));
}
} else {
switch (errorCode) {
case APPLICATION_ERROR:
return new ApplicationErrorException(message);
case REJECTED:
return new RejectedException(message);
case CANCELED:
return new CanceledException(message);
case INVALID:
return new InvalidException(message);
default:
if (errorCode >= MIN_USER_ALLOWED_ERROR_CODE
|| errorCode <= MAX_USER_ALLOWED_ERROR_CODE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On re-reading on a computer (not Github iOS tesatflight app :), this seems good.

          if (errorCode >= 769
              || errorCode <= -2) {

return new CustomRSocketException(errorCode, message);
}
return new IllegalArgumentException(
String.format(
"Invalid Error frame in Stream ID %d: 0x%08X '%s'",
streamId, errorCode, message));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@

public class ErrorFrameFlyweight {

// defined error codes
// defined zero stream id error codes
public static final int INVALID_SETUP = 0x00000001;
public static final int UNSUPPORTED_SETUP = 0x00000002;
public static final int REJECTED_SETUP = 0x00000003;
public static final int REJECTED_RESUME = 0x00000004;
public static final int CONNECTION_ERROR = 0x00000101;
public static final int CONNECTION_CLOSE = 0x00000102;
// defined non-zero stream id error codes
public static final int APPLICATION_ERROR = 0x00000201;
public static final int REJECTED = 0x00000202;
public static final int CANCELED = 0x00000203;
public static final int INVALID = 0x00000204;
// defined user-allowed error codes range
public static final int MIN_USER_ALLOWED_ERROR_CODE = 0x00000301;
public static final int MAX_USER_ALLOWED_ERROR_CODE = 0xFFFFFFFE;

public static ByteBuf encode(
ByteBufAllocator allocator, int streamId, Throwable t, ByteBuf data) {
Expand Down
11 changes: 10 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public final class ErrorType {
/**
* Application layer logic generating a Reactive Streams onError event. Stream ID MUST be &gt; 0.
*/
public static final int APPLICATION_ERROR = 0x00000201;;
public static final int APPLICATION_ERROR = 0x00000201;

/**
* The Responder canceled the request but may have started processing it (similar to REJECTED but
Expand Down Expand Up @@ -70,5 +70,14 @@ public final class ErrorType {
*/
public static final int UNSUPPORTED_SETUP = 0x00000002;

/** Minimum allowed user defined error code value */
public static final int MIN_USER_ALLOWED_ERROR_CODE = 0x00000301;

/**
* Maximum allowed user defined error code value. Note, the value is above signed integer maximum,
* so it will be negative after overflow.
*/
public static final int MAX_USER_ALLOWED_ERROR_CODE = 0xFFFFFFFE;

private ErrorType() {}
}
3 changes: 2 additions & 1 deletion rsocket-core/src/test/java/io/rsocket/RSocketLeaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public void serverRSocketFactoryRejectsUnsupportedLease() {
Assertions.assertThat(sent).hasSize(1);
ByteBuf error = sent.iterator().next();
Assertions.assertThat(FrameHeaderFlyweight.frameType(error)).isEqualTo(ERROR);
Assertions.assertThat(Exceptions.from(error).getMessage()).isEqualTo("lease is not supported");
Assertions.assertThat(Exceptions.from(0, error).getMessage())
.isEqualTo("lease is not supported");
}

@Test
Expand Down
30 changes: 30 additions & 0 deletions rsocket-core/src/test/java/io/rsocket/RSocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.CustomRSocketException;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.lease.ResponderLeaseHandler;
import io.rsocket.test.util.LocalDuplexConnection;
Expand All @@ -38,6 +39,7 @@
import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.mockito.ArgumentCaptor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.DirectProcessor;
Expand Down Expand Up @@ -86,6 +88,34 @@ public Mono<Payload> requestResponse(Payload payload) {
rule.assertServerError("java.lang.NullPointerException: Deliberate exception.");
}

@Test(timeout = 2000)
public void testHandlerEmitsCustomError() {
rule.setRequestAcceptor(
new AbstractRSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.error(
new CustomRSocketException(0x00000501, "Deliberate Custom exception."));
}
});
Subscriber<Payload> subscriber = TestSubscriber.create();
rule.crs.requestResponse(EmptyPayload.INSTANCE).subscribe(subscriber);
ArgumentCaptor<CustomRSocketException> customRSocketExceptionArgumentCaptor =
ArgumentCaptor.forClass(CustomRSocketException.class);
verify(subscriber).onError(customRSocketExceptionArgumentCaptor.capture());

Assert.assertEquals(
"Deliberate Custom exception.",
customRSocketExceptionArgumentCaptor.getValue().getMessage());
Assert.assertEquals(0x00000501, customRSocketExceptionArgumentCaptor.getValue().errorCode());

// Client sees error through normal API
rule.assertNoClientErrors();

rule.assertServerError(
"io.rsocket.exceptions.CustomRSocketException: Deliberate Custom exception.");
}

@Test(timeout = 2000)
public void testStream() throws Exception {
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void responderRejectSetup() {

ByteBuf sentFrame = transport.awaitSent();
assertThat(FrameHeaderFlyweight.frameType(sentFrame)).isEqualTo(FrameType.ERROR);
RuntimeException error = Exceptions.from(sentFrame);
RuntimeException error = Exceptions.from(0, sentFrame);
assertThat(errorMsg).isEqualTo(error.getMessage());
assertThat(error).isInstanceOf(RejectedSetupException.class);
RSocket acceptorSender = acceptor.senderRSocket().block();
Expand Down
Loading