Skip to content

Commit 8d9a54a

Browse files
committed
Avoid queueing in UnicastProcessor receivers
Closes gh-887 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 5a3dc4b commit 8d9a54a

File tree

4 files changed

+13
-7
lines changed

4 files changed

+13
-7
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -348,7 +348,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
348348
}
349349

350350
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
351-
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
351+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
352352
final AtomicBoolean once = new AtomicBoolean();
353353

354354
return Flux.defer(
@@ -456,7 +456,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
456456
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
457457
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
458458

459-
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
459+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
460460

461461
return receiver
462462
.transform(

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
import reactor.core.Exceptions;
4949
import reactor.core.publisher.*;
5050
import reactor.util.annotation.Nullable;
51+
import reactor.util.concurrent.Queues;
5152

5253
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
5354
class RSocketResponder implements RSocket {
@@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) {
537538
}
538539

539540
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
540-
UnicastProcessor<Payload> frames = UnicastProcessor.create();
541+
UnicastProcessor<Payload> frames = UnicastProcessor.create(Queues.<Payload>one().get());
541542
channelProcessors.put(streamId, frames);
542543

543544
Flux<Payload> payloads =

rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,13 +158,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
158158
}
159159

160160
@Test(timeout = 2000)
161-
public void testStream() throws Exception {
161+
public void testStream() {
162162
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));
163163
StepVerifier.create(responses).expectNextCount(10).expectComplete().verify();
164164
}
165165

166166
@Test(timeout = 2000)
167-
public void testChannel() throws Exception {
167+
public void testChannel() {
168168
Flux<Payload> requests =
169169
Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i));
170170
Flux<Payload> responses = rule.crs.requestChannel(requests);
@@ -543,6 +543,7 @@ public Mono<Payload> requestResponse(Payload payload) {
543543
@Override
544544
public Flux<Payload> requestStream(Payload payload) {
545545
return Flux.range(1, 10)
546+
.delaySubscription(Duration.ofMillis(100))
546547
.map(
547548
i -> DefaultPayload.create("server got -> [" + payload.toString() + "]"));
548549
}
@@ -556,6 +557,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
556557
.subscribe();
557558

558559
return Flux.range(1, 10)
560+
.delaySubscription(Duration.ofMillis(100))
559561
.map(
560562
payload ->
561563
DefaultPayload.create("server got -> [" + payload.toString() + "]"));

rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public void testRangeButThrowException() {
4949
}
5050
})
5151
.map(l -> DefaultPayload.create("l -> " + l))
52+
.delaySubscription(Duration.ofMillis(100))
5253
.cast(Payload.class)))
5354
.bind(serverTransport)
5455
.block();
@@ -71,6 +72,7 @@ public void testRangeOfConsumers() {
7172
payload ->
7273
Flux.range(1, 1000)
7374
.map(l -> DefaultPayload.create("l -> " + l))
75+
.delaySubscription(Duration.ofMillis(100))
7476
.cast(Payload.class)))
7577
.bind(serverTransport)
7678
.block();
@@ -104,6 +106,7 @@ public void testSingleConsumer() {
104106
payload ->
105107
Flux.range(1, 10_000)
106108
.map(l -> DefaultPayload.create("l -> " + l))
109+
.delaySubscription(Duration.ofMillis(100))
107110
.cast(Payload.class)))
108111
.bind(serverTransport)
109112
.block();

0 commit comments

Comments
 (0)