Skip to content

Commit 1efddd1

Browse files
committed
better performance with resume on
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 900a593 commit 1efddd1

File tree

3 files changed

+33
-20
lines changed

3 files changed

+33
-20
lines changed

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
public class InMemoryResumableFramesStore implements ResumableFramesStore {
3131
private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class);
32-
private static final int SAVE_REQUEST_SIZE = 256;
32+
private static final long SAVE_REQUEST_SIZE = Long.MAX_VALUE;
3333

3434
private final MonoProcessor<Void> disposed = MonoProcessor.create();
3535
private volatile long position;
@@ -180,12 +180,12 @@ private static Queue<ByteBuf> cachedFramesQueue(int size) {
180180
}
181181

182182
private class FramesSubscriber implements Subscriber<ByteBuf> {
183-
private final int firstRequestSize;
184-
private final int refillSize;
183+
private final long firstRequestSize;
184+
private final long refillSize;
185185
private int received;
186186
private Subscription s;
187187

188-
public FramesSubscriber(int requestSize) {
188+
public FramesSubscriber(long requestSize) {
189189
this.firstRequestSize = requestSize;
190190
this.refillSize = firstRequestSize / 2;
191191
}
@@ -199,7 +199,7 @@ public void onSubscribe(Subscription s) {
199199
@Override
200200
public void onNext(ByteBuf byteBuf) {
201201
saveFrame(byteBuf);
202-
if (++received == refillSize) {
202+
if (firstRequestSize != Long.MAX_VALUE && ++received == refillSize) {
203203
received = 0;
204204
s.request(refillSize);
205205
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.rsocket.Closeable;
2121
import io.rsocket.DuplexConnection;
2222
import io.rsocket.frame.FrameHeaderFlyweight;
23-
import io.rsocket.internal.UnboundedProcessor;
2423
import java.nio.channels.ClosedChannelException;
2524
import java.time.Duration;
2625
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,8 +46,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
4746
private final FluxProcessor<ByteBuf, ByteBuf> downStreamFrames = ReplayProcessor.create(0);
4847
private final FluxProcessor<ByteBuf, ByteBuf> resumeSaveFrames = EmitterProcessor.create();
4948
private final MonoProcessor<Void> resumeSaveCompleted = MonoProcessor.create();
50-
51-
private final UnboundedProcessor<Object> actions = new UnboundedProcessor<>();
49+
private final FluxProcessor<Object, Object> actions = UnicastProcessor.create().serialize();
5250

5351
private final Mono<Void> framesSent;
5452
private final RequestListener downStreamRequestListener = new RequestListener();
@@ -97,12 +95,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
9795
.then()
9896
.cache();
9997

100-
Flux<Object> acts = actions.publish().autoConnect(4);
101-
acts.ofType(ByteBuf.class).subscribe(this::sendFrame);
102-
acts.ofType(ResumeStart.class).subscribe(ResumeStart::run);
103-
acts.ofType(Resume.class).subscribe(Resume::run);
104-
acts.ofType(ResumeComplete.class).subscribe(ResumeComplete::run);
105-
98+
dispatch(actions);
10699
reconnect(duplexConnection);
107100
}
108101

@@ -225,6 +218,17 @@ Flux<Throwable> connectionErrors() {
225218
return connectionErrors;
226219
}
227220

221+
private void dispatch(Flux<?> f) {
222+
f.subscribe(
223+
o -> {
224+
if (o instanceof ByteBuf) {
225+
sendFrame((ByteBuf) o);
226+
} else {
227+
((Runnable) o).run();
228+
}
229+
});
230+
}
231+
228232
private void doResumeStart(ResumeAwareConnection connection) {
229233
state = State.RESUME_STARTED;
230234
resumedStreamDisposable.dispose();

rsocket-core/src/main/java/io/rsocket/resume/UpstreamFramesSubscriber.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.rsocket.resume;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.rsocket.internal.UnboundedProcessor;
2120
import java.util.Queue;
2221
import java.util.concurrent.atomic.AtomicBoolean;
2322
import java.util.function.Consumer;
@@ -27,13 +26,15 @@
2726
import org.slf4j.LoggerFactory;
2827
import reactor.core.Disposable;
2928
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.FluxProcessor;
3030
import reactor.core.publisher.Operators;
31+
import reactor.core.publisher.UnicastProcessor;
3132
import reactor.util.concurrent.Queues;
3233

3334
class UpstreamFramesSubscriber implements Subscriber<ByteBuf>, Disposable {
3435
private static final Logger logger = LoggerFactory.getLogger(UpstreamFramesSubscriber.class);
3536

36-
private final UnboundedProcessor<Object> actions = new UnboundedProcessor<>();
37+
private final FluxProcessor<Object, Object> actions = UnicastProcessor.create().serialize();
3738
private final AtomicBoolean disposed = new AtomicBoolean();
3839
private final Consumer<ByteBuf> itemConsumer;
3940
private final Disposable downstreamRequestDisposable;
@@ -59,10 +60,7 @@ class UpstreamFramesSubscriber implements Subscriber<ByteBuf>, Disposable {
5960
resumeSaveStreamDisposable =
6061
resumeSaveStreamRequests.subscribe(requestN -> requestN(requestN, 0));
6162

62-
Flux<Object> acts = actions.publish().autoConnect(3);
63-
acts.ofType(ByteBuf.class).subscribe(this::processFrame);
64-
acts.ofType(ResumeStart.class).subscribe(ResumeStart::run);
65-
acts.ofType(ResumeComplete.class).subscribe(ResumeComplete::run);
63+
dispatch(actions);
6664
}
6765

6866
@Override
@@ -115,6 +113,17 @@ public boolean isDisposed() {
115113
return disposed.get();
116114
}
117115

116+
private void dispatch(Flux<?> p) {
117+
p.subscribe(
118+
o -> {
119+
if (o instanceof ByteBuf) {
120+
processFrame(((ByteBuf) o));
121+
} else {
122+
((Runnable) o).run();
123+
}
124+
});
125+
}
126+
118127
private void requestN(long resumeStreamRequest, long downStreamRequest) {
119128
synchronized (this) {
120129
downStreamRequestN = Operators.addCap(downStreamRequestN, downStreamRequest);

0 commit comments

Comments
 (0)