Skip to content

Commit 111a3b8

Browse files
committed
provides prioritized sending for zero-stream frames; refactors benchmarks
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 08cefe0 commit 111a3b8

File tree

5 files changed

+3
-464
lines changed

5 files changed

+3
-464
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketResponder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class RSocketResponder implements ResponderRSocket {
8585
.subscribe(null, this::handleSendProcessorError);
8686

8787
Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
88-
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNext);
88+
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);
8989

9090
this.connection
9191
.onClose()

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import io.netty.util.ReferenceCounted;
2020
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
21-
import io.rsocket.internal.jctools.queues.SpscUnboundedArrayQueue;
2221
import java.util.Objects;
2322
import java.util.Queue;
2423
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -69,12 +68,12 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>
6968

7069
public UnboundedProcessor() {
7170
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
72-
this.priorityQueue = new SpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
71+
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
7372
}
7473

7574
@Override
7675
public int getBufferSize() {
77-
return Queues.capacity(this.queue);
76+
return Integer.MAX_VALUE;
7877
}
7978

8079
@Override

rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseSpscLinkedArrayQueue.java

Lines changed: 0 additions & 367 deletions
This file was deleted.

0 commit comments

Comments
 (0)