Skip to content

Commit 5212dec

Browse files
committed
fixes error handling to match specification (#727)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 6a53f24 commit 5212dec

File tree

10 files changed

+309
-72
lines changed

10 files changed

+309
-72
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
475475
} else {
476476
switch (type) {
477477
case ERROR:
478-
receiver.onError(Exceptions.from(frame));
478+
receiver.onError(Exceptions.from(streamId, frame));
479479
receivers.remove(streamId);
480480
break;
481481
case NEXT_COMPLETE:
@@ -549,7 +549,7 @@ private void tryTerminateOnConnectionClose() {
549549
}
550550

551551
private void tryTerminateOnZeroError(ByteBuf errorFrame) {
552-
tryTerminate(() -> Exceptions.from(errorFrame));
552+
tryTerminate(() -> Exceptions.from(0, errorFrame));
553553
}
554554

555555
private void tryTerminate(Supplier<Exception> errorSupplier) {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.rsocket.exceptions;
2+
3+
import io.rsocket.frame.ErrorType;
4+
5+
public class CustomRSocketException extends RSocketException {
6+
private static final long serialVersionUID = 7873267740343446585L;
7+
8+
private final int errorCode;
9+
10+
/**
11+
* Constructs a new exception with the specified message.
12+
*
13+
* @param errorCode customizable error code. Should be in range [0x00000301-0xFFFFFFFE]
14+
* @param message the message
15+
* @throws NullPointerException if {@code message} is {@code null}
16+
* @throws IllegalArgumentException if {@code errorCode} is out of allowed range
17+
*/
18+
public CustomRSocketException(int errorCode, String message) {
19+
super(message);
20+
if (errorCode > ErrorType.MAX_USER_ALLOWED_ERROR_CODE
21+
&& errorCode < ErrorType.MIN_USER_ALLOWED_ERROR_CODE) {
22+
throw new IllegalArgumentException(
23+
"Allowed errorCode value should be in range [0x00000301-0xFFFFFFFE]");
24+
}
25+
this.errorCode = errorCode;
26+
}
27+
28+
/**
29+
* Constructs a new exception with the specified message and cause.
30+
*
31+
* @param errorCode customizable error code. Should be in range [0x00000301-0xFFFFFFFE]
32+
* @param message the message
33+
* @param cause the cause of this exception
34+
* @throws NullPointerException if {@code message} or {@code cause} is {@code null}
35+
* @throws IllegalArgumentException if {@code errorCode} is out of allowed range
36+
*/
37+
public CustomRSocketException(int errorCode, String message, Throwable cause) {
38+
super(message, cause);
39+
if (errorCode > ErrorType.MAX_USER_ALLOWED_ERROR_CODE
40+
&& errorCode < ErrorType.MIN_USER_ALLOWED_ERROR_CODE) {
41+
throw new IllegalArgumentException(
42+
"Allowed errorCode value should be in range [0x00000301-0xFFFFFFFE]");
43+
}
44+
this.errorCode = errorCode;
45+
}
46+
47+
@Override
48+
public int errorCode() {
49+
return errorCode;
50+
}
51+
}

rsocket-core/src/main/java/io/rsocket/exceptions/Exceptions.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,36 +34,50 @@ private Exceptions() {}
3434
* @return a {@link RSocketException} that matches the error code in the Frame
3535
* @throws NullPointerException if {@code frame} is {@code null}
3636
*/
37-
public static RuntimeException from(ByteBuf frame) {
37+
public static RuntimeException from(int streamId, ByteBuf frame) {
3838
Objects.requireNonNull(frame, "frame must not be null");
3939

4040
int errorCode = ErrorFrameFlyweight.errorCode(frame);
4141
String message = ErrorFrameFlyweight.dataUtf8(frame);
4242

43-
switch (errorCode) {
44-
case APPLICATION_ERROR:
45-
return new ApplicationErrorException(message);
46-
case CANCELED:
47-
return new CanceledException(message);
48-
case CONNECTION_CLOSE:
49-
return new ConnectionCloseException(message);
50-
case CONNECTION_ERROR:
51-
return new ConnectionErrorException(message);
52-
case INVALID:
53-
return new InvalidException(message);
54-
case INVALID_SETUP:
55-
return new InvalidSetupException(message);
56-
case REJECTED:
57-
return new RejectedException(message);
58-
case REJECTED_RESUME:
59-
return new RejectedResumeException(message);
60-
case REJECTED_SETUP:
61-
return new RejectedSetupException(message);
62-
case UNSUPPORTED_SETUP:
63-
return new UnsupportedSetupException(message);
64-
default:
65-
return new IllegalArgumentException(
66-
String.format("Invalid Error frame: %d '%s'", errorCode, message));
43+
if (streamId == 0) {
44+
switch (errorCode) {
45+
case INVALID_SETUP:
46+
return new InvalidSetupException(message);
47+
case UNSUPPORTED_SETUP:
48+
return new UnsupportedSetupException(message);
49+
case REJECTED_SETUP:
50+
return new RejectedSetupException(message);
51+
case REJECTED_RESUME:
52+
return new RejectedResumeException(message);
53+
case CONNECTION_ERROR:
54+
return new ConnectionErrorException(message);
55+
case CONNECTION_CLOSE:
56+
return new ConnectionCloseException(message);
57+
default:
58+
return new IllegalArgumentException(
59+
String.format("Invalid Error frame in Stream ID 0: 0x%08X '%s'", errorCode, message));
60+
}
61+
} else {
62+
switch (errorCode) {
63+
case APPLICATION_ERROR:
64+
return new ApplicationErrorException(message);
65+
case REJECTED:
66+
return new RejectedException(message);
67+
case CANCELED:
68+
return new CanceledException(message);
69+
case INVALID:
70+
return new InvalidException(message);
71+
default:
72+
if (errorCode >= MIN_USER_ALLOWED_ERROR_CODE
73+
|| errorCode <= MAX_USER_ALLOWED_ERROR_CODE) {
74+
return new CustomRSocketException(errorCode, message);
75+
}
76+
return new IllegalArgumentException(
77+
String.format(
78+
"Invalid Error frame in Stream ID %d: 0x%08X '%s'",
79+
streamId, errorCode, message));
80+
}
6781
}
6882
}
6983
}

rsocket-core/src/main/java/io/rsocket/frame/ErrorFrameFlyweight.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@
88

99
public class ErrorFrameFlyweight {
1010

11-
// defined error codes
11+
// defined zero stream id error codes
1212
public static final int INVALID_SETUP = 0x00000001;
1313
public static final int UNSUPPORTED_SETUP = 0x00000002;
1414
public static final int REJECTED_SETUP = 0x00000003;
1515
public static final int REJECTED_RESUME = 0x00000004;
1616
public static final int CONNECTION_ERROR = 0x00000101;
1717
public static final int CONNECTION_CLOSE = 0x00000102;
18+
// defined non-zero stream id error codes
1819
public static final int APPLICATION_ERROR = 0x00000201;
1920
public static final int REJECTED = 0x00000202;
2021
public static final int CANCELED = 0x00000203;
2122
public static final int INVALID = 0x00000204;
23+
// defined user-allowed error codes range
24+
public static final int MIN_USER_ALLOWED_ERROR_CODE = 0x00000301;
25+
public static final int MAX_USER_ALLOWED_ERROR_CODE = 0xFFFFFFFE;
2226

2327
public static ByteBuf encode(
2428
ByteBufAllocator allocator, int streamId, Throwable t, ByteBuf data) {

rsocket-core/src/main/java/io/rsocket/frame/ErrorType.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public final class ErrorType {
1111
/**
1212
* Application layer logic generating a Reactive Streams onError event. Stream ID MUST be &gt; 0.
1313
*/
14-
public static final int APPLICATION_ERROR = 0x00000201;;
14+
public static final int APPLICATION_ERROR = 0x00000201;
1515

1616
/**
1717
* The Responder canceled the request but may have started processing it (similar to REJECTED but
@@ -70,5 +70,14 @@ public final class ErrorType {
7070
*/
7171
public static final int UNSUPPORTED_SETUP = 0x00000002;
7272

73+
/** Minimum allowed user defined error code value */
74+
public static final int MIN_USER_ALLOWED_ERROR_CODE = 0x00000301;
75+
76+
/**
77+
* Maximum allowed user defined error code value. Note, the value is above signed integer maximum,
78+
* so it will be negative after overflow.
79+
*/
80+
public static final int MAX_USER_ALLOWED_ERROR_CODE = 0xFFFFFFFE;
81+
7382
private ErrorType() {}
7483
}

rsocket-core/src/test/java/io/rsocket/RSocketLeaseTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ public void serverRSocketFactoryRejectsUnsupportedLease() {
141141
Assertions.assertThat(sent).hasSize(1);
142142
ByteBuf error = sent.iterator().next();
143143
Assertions.assertThat(FrameHeaderFlyweight.frameType(error)).isEqualTo(ERROR);
144-
Assertions.assertThat(Exceptions.from(error).getMessage()).isEqualTo("lease is not supported");
144+
Assertions.assertThat(Exceptions.from(0, error).getMessage())
145+
.isEqualTo("lease is not supported");
145146
}
146147

147148
@Test

rsocket-core/src/test/java/io/rsocket/RSocketTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.netty.buffer.ByteBuf;
2525
import io.netty.buffer.ByteBufAllocator;
2626
import io.rsocket.exceptions.ApplicationErrorException;
27+
import io.rsocket.exceptions.CustomRSocketException;
2728
import io.rsocket.lease.RequesterLeaseHandler;
2829
import io.rsocket.lease.ResponderLeaseHandler;
2930
import io.rsocket.test.util.LocalDuplexConnection;
@@ -38,6 +39,7 @@
3839
import org.junit.rules.ExternalResource;
3940
import org.junit.runner.Description;
4041
import org.junit.runners.model.Statement;
42+
import org.mockito.ArgumentCaptor;
4143
import org.reactivestreams.Publisher;
4244
import org.reactivestreams.Subscriber;
4345
import reactor.core.publisher.DirectProcessor;
@@ -86,6 +88,34 @@ public Mono<Payload> requestResponse(Payload payload) {
8688
rule.assertServerError("java.lang.NullPointerException: Deliberate exception.");
8789
}
8890

91+
@Test(timeout = 2000)
92+
public void testHandlerEmitsCustomError() {
93+
rule.setRequestAcceptor(
94+
new AbstractRSocket() {
95+
@Override
96+
public Mono<Payload> requestResponse(Payload payload) {
97+
return Mono.error(
98+
new CustomRSocketException(0x00000501, "Deliberate Custom exception."));
99+
}
100+
});
101+
Subscriber<Payload> subscriber = TestSubscriber.create();
102+
rule.crs.requestResponse(EmptyPayload.INSTANCE).subscribe(subscriber);
103+
ArgumentCaptor<CustomRSocketException> customRSocketExceptionArgumentCaptor =
104+
ArgumentCaptor.forClass(CustomRSocketException.class);
105+
verify(subscriber).onError(customRSocketExceptionArgumentCaptor.capture());
106+
107+
Assert.assertEquals(
108+
"Deliberate Custom exception.",
109+
customRSocketExceptionArgumentCaptor.getValue().getMessage());
110+
Assert.assertEquals(0x00000501, customRSocketExceptionArgumentCaptor.getValue().errorCode());
111+
112+
// Client sees error through normal API
113+
rule.assertNoClientErrors();
114+
115+
rule.assertServerError(
116+
"io.rsocket.exceptions.CustomRSocketException: Deliberate Custom exception.");
117+
}
118+
89119
@Test(timeout = 2000)
90120
public void testStream() throws Exception {
91121
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));

rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void responderRejectSetup() {
3737

3838
ByteBuf sentFrame = transport.awaitSent();
3939
assertThat(FrameHeaderFlyweight.frameType(sentFrame)).isEqualTo(FrameType.ERROR);
40-
RuntimeException error = Exceptions.from(sentFrame);
40+
RuntimeException error = Exceptions.from(0, sentFrame);
4141
assertThat(errorMsg).isEqualTo(error.getMessage());
4242
assertThat(error).isInstanceOf(RejectedSetupException.class);
4343
RSocket acceptorSender = acceptor.senderRSocket().block();

0 commit comments

Comments
 (0)