Skip to content

Commit 5ee6c38

Browse files
committed
fixes incorrect request propagation
Initially that issue was hidden because both sides uses limitRate which does prefetch 256 elements in advance so it is almost impossible to track underflow in request Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 91e894a commit 5ee6c38

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,10 @@ protected void hookOnSubscribe(Subscription subscription) {
382382
protected void hookOnNext(Payload payload) {
383383
if (first) {
384384
// need to skip first since we have already sent it
385+
// no need to release it since it was released earlier on the request establishment
386+
// phase
385387
first = false;
388+
request(1);
386389
return;
387390
}
388391
if (!PayloadValidationUtils.isValid(mtu, payload)) {

rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.rsocket.test.util.TestSubscriber;
3535
import io.rsocket.util.DefaultPayload;
3636
import io.rsocket.util.EmptyPayload;
37+
import java.time.Duration;
3738
import java.util.ArrayList;
3839
import java.util.concurrent.atomic.AtomicReference;
3940
import org.assertj.core.api.Assertions;
@@ -120,6 +121,32 @@ public Mono<Payload> requestResponse(Payload payload) {
120121
rule.assertServerError("CustomRSocketException (0x501): Deliberate Custom exception.");
121122
}
122123

124+
@Test(timeout = 2000)
125+
public void testRequestPropagatesCorrectlyForRequestChannel() {
126+
rule.setRequestAcceptor(
127+
new AbstractRSocket() {
128+
@Override
129+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
130+
return Flux.from(payloads)
131+
// specifically limits request to 3 in order to prevent 256 request from limitRate
132+
// hidden on the responder side
133+
.limitRequest(3);
134+
}
135+
});
136+
137+
Flux.range(0, 3)
138+
.map(i -> DefaultPayload.create("" + i))
139+
.as(rule.crs::requestChannel)
140+
.as(publisher -> StepVerifier.create(publisher, 3))
141+
.expectSubscription()
142+
.expectNextCount(3)
143+
.expectComplete()
144+
.verify(Duration.ofMillis(5000));
145+
146+
rule.assertNoClientErrors();
147+
rule.assertNoServerErrors();
148+
}
149+
123150
@Test(timeout = 2000)
124151
public void testStream() throws Exception {
125152
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));

0 commit comments

Comments
 (0)