Skip to content

Commit 6b27495

Browse files
committed
Avoid queueing in UnicastProcessor receivers
Closes gh-887 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 5a3dc4b commit 6b27495

File tree

4 files changed

+16
-7
lines changed

4 files changed

+16
-7
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -348,7 +348,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
348348
}
349349

350350
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
351-
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
351+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
352352
final AtomicBoolean once = new AtomicBoolean();
353353

354354
return Flux.defer(
@@ -456,7 +456,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
456456
private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Payload> inboundFlux) {
457457
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
458458

459-
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
459+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(Queues.<Payload>one().get());
460460

461461
return receiver
462462
.transform(

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,7 @@
4848
import reactor.core.Exceptions;
4949
import reactor.core.publisher.*;
5050
import reactor.util.annotation.Nullable;
51+
import reactor.util.concurrent.Queues;
5152

5253
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
5354
class RSocketResponder implements RSocket {
@@ -537,7 +538,7 @@ protected void hookOnError(Throwable throwable) {
537538
}
538539

539540
private void handleChannel(int streamId, Payload payload, long initialRequestN) {
540-
UnicastProcessor<Payload> frames = UnicastProcessor.create();
541+
UnicastProcessor<Payload> frames = UnicastProcessor.create(Queues.<Payload>one().get());
541542
channelProcessors.put(streamId, frames);
542543

543544
Flux<Payload> payloads =

rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import reactor.core.publisher.DirectProcessor;
5050
import reactor.core.publisher.Flux;
5151
import reactor.core.publisher.Mono;
52+
import reactor.core.scheduler.Schedulers;
5253
import reactor.test.StepVerifier;
5354
import reactor.test.publisher.TestPublisher;
5455

@@ -158,13 +159,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
158159
}
159160

160161
@Test(timeout = 2000)
161-
public void testStream() throws Exception {
162+
public void testStream() {
162163
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));
163164
StepVerifier.create(responses).expectNextCount(10).expectComplete().verify();
164165
}
165166

166167
@Test(timeout = 2000)
167-
public void testChannel() throws Exception {
168+
public void testChannel() {
168169
Flux<Payload> requests =
169170
Flux.range(0, 10).map(i -> DefaultPayload.create("streaming in -> " + i));
170171
Flux<Payload> responses = rule.crs.requestChannel(requests);
@@ -543,6 +544,8 @@ public Mono<Payload> requestResponse(Payload payload) {
543544
@Override
544545
public Flux<Payload> requestStream(Payload payload) {
545546
return Flux.range(1, 10)
547+
.subscribeOn(Schedulers.boundedElastic())
548+
.doOnNext(i -> System.out.println("Creating payload " + i))
546549
.map(
547550
i -> DefaultPayload.create("server got -> [" + payload.toString() + "]"));
548551
}
@@ -556,6 +559,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
556559
.subscribe();
557560

558561
return Flux.range(1, 10)
562+
.subscribeOn(Schedulers.boundedElastic())
559563
.map(
560564
payload ->
561565
DefaultPayload.create("server got -> [" + payload.toString() + "]"));

rsocket-examples/src/test/java/io/rsocket/integration/TestingStreaming.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import org.junit.Test;
3131
import reactor.core.publisher.Flux;
32+
import reactor.core.scheduler.Schedulers;
3233

3334
public class TestingStreaming {
3435
LocalServerTransport serverTransport = LocalServerTransport.create("test");
@@ -49,6 +50,7 @@ public void testRangeButThrowException() {
4950
}
5051
})
5152
.map(l -> DefaultPayload.create("l -> " + l))
53+
.subscribeOn(Schedulers.boundedElastic())
5254
.cast(Payload.class)))
5355
.bind(serverTransport)
5456
.block();
@@ -71,6 +73,7 @@ public void testRangeOfConsumers() {
7173
payload ->
7274
Flux.range(1, 1000)
7375
.map(l -> DefaultPayload.create("l -> " + l))
76+
.subscribeOn(Schedulers.boundedElastic())
7477
.cast(Payload.class)))
7578
.bind(serverTransport)
7679
.block();
@@ -104,6 +107,7 @@ public void testSingleConsumer() {
104107
payload ->
105108
Flux.range(1, 10_000)
106109
.map(l -> DefaultPayload.create("l -> " + l))
110+
.subscribeOn(Schedulers.boundedElastic())
107111
.cast(Payload.class)))
108112
.bind(serverTransport)
109113
.block();

0 commit comments

Comments
 (0)