Skip to content

Commit 0601f88

Browse files
authored
fixes OverflowException if UnicstProcessr request and onNext race (#985)
1 parent 769ab2d commit 0601f88

File tree

2 files changed

+54
-1
lines changed

2 files changed

+54
-1
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ public Context currentContext() {
9393

9494
@Override
9595
public void request(long n) {
96-
this.s.request(n);
9796
if (!firstRequest) {
9897
try {
9998
this.hookOnRemainingRequests(n);
@@ -115,6 +114,15 @@ public void request(long n) {
115114
if (firstLoop) {
116115
firstLoop = false;
117116
try {
117+
// since in all the scenarios where RequestOperator is used, the
118+
// CorePublisher is either UnicastProcessor or UnicastProcessor.next()
119+
// we are free to propagate unbounded demand to that publisher right after
120+
// the first request happens. UnicastProcessor is only there to allow sending signals from
121+
// the
122+
// connection to a real subscriber and does not have to check the real demand
123+
// For more info see
124+
// https://github.com/rsocket/rsocket/blob/master/Protocol.md#handling-the-unexpected
125+
this.s.request(Long.MAX_VALUE);
118126
this.hookOnFirstRequest(n);
119127
} catch (Throwable throwable) {
120128
onError(throwable);

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,51 @@ public void testWorkaround858() {
11421142
rule.assertHasNoLeaks();
11431143
}
11441144

1145+
@ParameterizedTest
1146+
@ValueSource(strings = {"stream", "channel"})
1147+
// see https://github.com/rsocket/rsocket-java/issues/959
1148+
public void testWorkaround959(String type) {
1149+
for (int i = 1; i < 20000; i += 2) {
1150+
ByteBuf buffer = rule.alloc().buffer();
1151+
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
1152+
1153+
final AssertSubscriber<Payload> assertSubscriber = new AssertSubscriber<>(3);
1154+
if (type.equals("stream")) {
1155+
rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber);
1156+
} else if (type.equals("channel")) {
1157+
rule.socket
1158+
.requestChannel(Flux.just(ByteBufPayload.create(buffer)))
1159+
.subscribe(assertSubscriber);
1160+
}
1161+
1162+
final ByteBuf payloadFrame =
1163+
PayloadFrameCodec.encode(
1164+
rule.alloc(), i, false, false, true, Unpooled.EMPTY_BUFFER, Unpooled.EMPTY_BUFFER);
1165+
1166+
RaceTestUtils.race(
1167+
() -> {
1168+
rule.connection.addToReceivedBuffer(payloadFrame.copy());
1169+
rule.connection.addToReceivedBuffer(payloadFrame.copy());
1170+
rule.connection.addToReceivedBuffer(payloadFrame);
1171+
},
1172+
() -> {
1173+
assertSubscriber.request(1);
1174+
assertSubscriber.request(1);
1175+
assertSubscriber.request(1);
1176+
});
1177+
1178+
Assertions.assertThat(rule.connection.getSent()).allMatch(ByteBuf::release);
1179+
1180+
Assertions.assertThat(rule.socket.isDisposed()).isFalse();
1181+
1182+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
1183+
assertSubscriber.assertNoError();
1184+
1185+
rule.connection.clearSendReceiveBuffers();
1186+
rule.assertHasNoLeaks();
1187+
}
1188+
}
1189+
11451190
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
11461191
@Override
11471192
protected RSocketRequester newRSocket() {

0 commit comments

Comments
 (0)