Skip to content

Commit f8249eb

Browse files
committed
more improvements
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 48fef8b commit f8249eb

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ public void request(long n) {
114114
if (firstLoop) {
115115
firstLoop = false;
116116
try {
117+
// since in all the scenarios where RequestOperator is used, the
118+
// CorePublisher is either UnicastProcessor or UnicastProcessor.next()
119+
// Therefore, we are free to propagate max demand immediately up on the first
120+
// request. UnicastProcessor is only there to allow sending signals from the
121+
// connection to a created stream and does not have to check the correctness
122+
// of flow-control. For more info see
123+
// https://github.com/rsocket/rsocket/blob/master/Protocol.md#handling-the-unexpected
117124
this.s.request(Long.MAX_VALUE);
118125
this.hookOnFirstRequest(n);
119126
} catch (Throwable throwable) {

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,15 +1142,22 @@ public void testWorkaround858() {
11421142
rule.assertHasNoLeaks();
11431143
}
11441144

1145-
@Test
1145+
@ParameterizedTest
1146+
@ValueSource(strings = {"stream", "channel"})
11461147
// see https://github.com/rsocket/rsocket-java/issues/959
1147-
public void testWorkaround959() {
1148-
for (int i = 1; i < 100000; i += 2) {
1148+
public void testWorkaround959(String type) {
1149+
for (int i = 1; i < 20000; i += 2) {
11491150
ByteBuf buffer = rule.alloc().buffer();
11501151
buffer.writeCharSequence("test", CharsetUtil.UTF_8);
11511152

11521153
final AssertSubscriber<Payload> assertSubscriber = new AssertSubscriber<>(3);
1153-
rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber);
1154+
if (type.equals("stream")) {
1155+
rule.socket.requestStream(ByteBufPayload.create(buffer)).subscribe(assertSubscriber);
1156+
} else if (type.equals("channel")) {
1157+
rule.socket
1158+
.requestChannel(Flux.just(ByteBufPayload.create(buffer)))
1159+
.subscribe(assertSubscriber);
1160+
}
11541161

11551162
final ByteBuf payloadFrame =
11561163
PayloadFrameCodec.encode(

0 commit comments

Comments
 (0)