Skip to content

Commit 09b0712

Browse files
committed
removes UnboundProcessor optimizations. Not possible today
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent af55072 commit 09b0712

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import reactor.core.publisher.Mono;
4242
import reactor.core.publisher.SignalType;
4343
import reactor.core.publisher.UnicastProcessor;
44-
import reactor.util.concurrent.Queues;
4544

4645
/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
4746
class RSocketClient implements RSocket {
@@ -226,8 +225,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
226225
int streamId = streamIdSupplier.nextStreamId();
227226

228227
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
229-
final UnicastProcessor<Payload> receiver =
230-
UnicastProcessor.create(Queues.<Payload>one().get());
228+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
231229

232230
receivers.put(streamId, receiver);
233231

@@ -277,8 +275,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
277275
return lifecycle.activeFlux(
278276
() -> {
279277
final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
280-
final UnicastProcessor<Payload> receiver =
281-
UnicastProcessor.create(Queues.<Payload>one().get());
278+
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
282279
final int streamId = streamIdSupplier.nextStreamId();
283280

284281
return receiver

rsocket-core/src/test/java/io/rsocket/RSocketTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public Mono<Payload> requestResponse(Payload payload) {
8484
rule.assertServerError("java.lang.NullPointerException: Deliberate exception.");
8585
}
8686

87+
@Test(timeout = 2000)
88+
public void testStream() throws Exception {
89+
Flux<Payload> responses = rule.crs.requestStream(DefaultPayload.create("Payload In"));
90+
StepVerifier.create(responses).expectNextCount(10).expectComplete().verify();
91+
}
92+
8793
@Test(timeout = 2000)
8894
public void testChannel() throws Exception {
8995
Flux<Payload> requests =
@@ -136,7 +142,9 @@ public Mono<Payload> requestResponse(Payload payload) {
136142

137143
@Override
138144
public Flux<Payload> requestStream(Payload payload) {
139-
return Flux.never();
145+
return Flux.range(1, 10)
146+
.map(
147+
i -> DefaultPayload.create("server got -> [" + payload.toString() + "]"));
140148
}
141149

142150
@Override

0 commit comments

Comments
 (0)