Skip to content

Commit 5c71082

Browse files
committed
ResumableDuplexConnection: use drain-queue for actions
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 4fd686c commit 5c71082

File tree

4 files changed

+37
-68
lines changed

4 files changed

+37
-68
lines changed

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
import org.reactivestreams.Publisher;
3434
import reactor.core.publisher.*;
3535

36-
public class KeepAliveConnection extends DuplexConnectionProxy implements ResumePositionsConnection {
36+
public class KeepAliveConnection extends DuplexConnectionProxy
37+
implements ResumePositionsConnection {
3738

3839
private final MonoProcessor<KeepAliveHandler> keepAliveHandlerReady = MonoProcessor.create();
3940
private final ByteBufAllocator allocator;

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import org.slf4j.LoggerFactory;
3131
import reactor.core.publisher.Mono;
3232

33-
public class ClientRSocketSession implements RSocketSession<Mono<? extends ResumePositionsConnection>> {
33+
public class ClientRSocketSession
34+
implements RSocketSession<Mono<? extends ResumePositionsConnection>> {
3435
private static final Logger logger = LoggerFactory.getLogger(ClientRSocketSession.class);
3536

3637
private final ResumableDuplexConnection resumableConnection;
@@ -101,7 +102,8 @@ public ClientRSocketSession(
101102
}
102103

103104
@Override
104-
public ClientRSocketSession continueWith(Mono<? extends ResumePositionsConnection> newConnection) {
105+
public ClientRSocketSession continueWith(
106+
Mono<? extends ResumePositionsConnection> newConnection) {
105107
this.newConnection = newConnection;
106108
return this;
107109
}

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import io.rsocket.frame.FrameHeaderFlyweight;
2323
import java.nio.channels.ClosedChannelException;
2424
import java.time.Duration;
25+
import java.util.Queue;
2526
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicInteger;
2628
import java.util.function.Function;
2729
import org.reactivestreams.Publisher;
2830
import org.slf4j.Logger;
2931
import org.slf4j.LoggerFactory;
3032
import reactor.core.Disposable;
3133
import reactor.core.Disposables;
3234
import reactor.core.publisher.*;
35+
import reactor.util.concurrent.Queues;
3336

3437
class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
3538
private static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
@@ -45,7 +48,8 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
4548
private final FluxProcessor<ByteBuf, ByteBuf> downStreamFrames = ReplayProcessor.create(0);
4649
private final FluxProcessor<ByteBuf, ByteBuf> resumeSaveFrames = EmitterProcessor.create();
4750
private final MonoProcessor<Void> resumeSaveCompleted = MonoProcessor.create();
48-
private final FluxProcessor<Object, Object> actions = UnicastProcessor.create().serialize();
51+
private final Queue<Object> actions = Queues.unboundedMultiproducer().get();
52+
private final AtomicInteger actionsWip = new AtomicInteger();
4953

5054
private final Mono<Void> framesSent;
5155
private final RequestListener downStreamRequestListener = new RequestListener();
@@ -56,7 +60,7 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
5660
128,
5761
downStreamRequestListener.requests(),
5862
resumeSaveStreamRequestListener.requests(),
59-
actions::onNext);
63+
this::dispatch);
6064

6165
private volatile State state;
6266
private volatile Disposable resumedStreamDisposable = Disposables.disposed();
@@ -92,7 +96,6 @@ class ResumableDuplexConnection implements DuplexConnection, ResumeStateHolder {
9296
.then()
9397
.cache();
9498

95-
dispatch(actions);
9699
reconnect(duplexConnection);
97100
}
98101

@@ -109,7 +112,7 @@ public void reconnect(ResumePositionsConnection connection) {
109112
"{} Resumable duplex connection reconnected with connection: {}", tag, connection);
110113
/*race between sendFrame and doResumeStart may lead to ongoing upstream frames
111114
written before resume complete*/
112-
actions.onNext(new ResumeStart(connection));
115+
dispatch(new ResumeStart(connection));
113116
}
114117
}
115118

@@ -118,7 +121,7 @@ public void reconnect(ResumePositionsConnection connection) {
118121
public void resume(
119122
long remotePos, long remoteImpliedPos, Function<Mono<Long>, Mono<Void>> resumeFrameSent) {
120123
/*race between sendFrame and doResume may lead to duplicate frames on resume store*/
121-
actions.onNext(new Resume(remotePos, remoteImpliedPos, resumeFrameSent));
124+
dispatch(new Resume(remotePos, remoteImpliedPos, resumeFrameSent));
122125
}
123126

124127
@Override
@@ -214,15 +217,18 @@ Flux<Throwable> connectionErrors() {
214217
return connectionErrors;
215218
}
216219

217-
private void dispatch(Flux<?> f) {
218-
f.subscribe(
219-
o -> {
220-
if (o instanceof ByteBuf) {
221-
sendFrame((ByteBuf) o);
222-
} else {
223-
((Runnable) o).run();
224-
}
225-
});
220+
private void dispatch(Object action) {
221+
actions.offer(action);
222+
if (actionsWip.getAndIncrement() == 0) {
223+
do {
224+
Object a = actions.poll();
225+
if (a instanceof ByteBuf) {
226+
sendFrame((ByteBuf) a);
227+
} else {
228+
((Runnable) a).run();
229+
}
230+
} while (actionsWip.decrementAndGet() != 0);
231+
}
226232
}
227233

228234
private void doResumeStart(ResumePositionsConnection connection) {
@@ -272,7 +278,7 @@ private void doResume(
272278
resumableFramesStore
273279
.resumeStream()
274280
.timeout(resumeStreamTimeout)
275-
.doFinally(s -> actions.onNext(new ResumeComplete())))
281+
.doFinally(s -> dispatch(new ResumeComplete())))
276282
.doOnError(err -> dispose()))
277283
.onErrorResume(err -> Mono.empty())
278284
.subscribe();
@@ -299,7 +305,7 @@ private Mono<Void> streamResumedFrames(Flux<ByteBuf> frames) {
299305
s -> {
300306
ResumeFramesSubscriber subscriber =
301307
new ResumeFramesSubscriber(
302-
downStreamRequestListener.requests(), actions::onNext, s::error, s::success);
308+
downStreamRequestListener.requests(), this::dispatch, s::error, s::success);
303309
s.onDispose(subscriber);
304310
resumedStreamDisposable = subscriber;
305311
frames.subscribe(subscriber);

rsocket-core/src/main/java/io/rsocket/resume/UpstreamFramesSubscriber.java

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,12 @@
2626
import org.slf4j.LoggerFactory;
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.Flux;
29-
import reactor.core.publisher.FluxProcessor;
3029
import reactor.core.publisher.Operators;
31-
import reactor.core.publisher.UnicastProcessor;
3230
import reactor.util.concurrent.Queues;
3331

3432
class UpstreamFramesSubscriber implements Subscriber<ByteBuf>, Disposable {
3533
private static final Logger logger = LoggerFactory.getLogger(UpstreamFramesSubscriber.class);
3634

37-
private final FluxProcessor<Object, Object> actions = UnicastProcessor.create().serialize();
3835
private final AtomicBoolean disposed = new AtomicBoolean();
3936
private final Consumer<ByteBuf> itemConsumer;
4037
private final Disposable downstreamRequestDisposable;
@@ -59,8 +56,6 @@ class UpstreamFramesSubscriber implements Subscriber<ByteBuf>, Disposable {
5956

6057
resumeSaveStreamDisposable =
6158
resumeSaveStreamRequests.subscribe(requestN -> requestN(requestN, 0));
62-
63-
dispatch(actions);
6459
}
6560

6661
@Override
@@ -75,7 +70,7 @@ public void onSubscribe(Subscription s) {
7570

7671
@Override
7772
public void onNext(ByteBuf item) {
78-
actions.onNext(item);
73+
processFrame(item);
7974
}
8075

8176
@Override
@@ -89,11 +84,17 @@ public void onComplete() {
8984
}
9085

9186
public void resumeStart() {
92-
actions.onNext(new ResumeStart());
87+
resumeStarted = true;
9388
}
9489

9590
public void resumeComplete() {
96-
actions.onNext(new ResumeComplete());
91+
ByteBuf frame = framesCache.poll();
92+
while (frame != null) {
93+
itemConsumer.accept(frame);
94+
frame = framesCache.poll();
95+
}
96+
resumeStarted = false;
97+
doRequest();
9798
}
9899

99100
@Override
@@ -113,17 +114,6 @@ public boolean isDisposed() {
113114
return disposed.get();
114115
}
115116

116-
private void dispatch(Flux<?> p) {
117-
p.subscribe(
118-
o -> {
119-
if (o instanceof ByteBuf) {
120-
processFrame(((ByteBuf) o));
121-
} else {
122-
((Runnable) o).run();
123-
}
124-
});
125-
}
126-
127117
private void requestN(long resumeStreamRequest, long downStreamRequest) {
128118
synchronized (this) {
129119
downStreamRequestN = Operators.addCap(downStreamRequestN, downStreamRequest);
@@ -159,41 +149,11 @@ private void releaseCache() {
159149
}
160150
}
161151

162-
private void doResumeStart() {
163-
resumeStarted = true;
164-
}
165-
166-
private void doResumeComplete() {
167-
ByteBuf frame = framesCache.poll();
168-
while (frame != null) {
169-
itemConsumer.accept(frame);
170-
frame = framesCache.poll();
171-
}
172-
resumeStarted = false;
173-
doRequest();
174-
}
175-
176152
private void processFrame(ByteBuf item) {
177153
if (resumeStarted) {
178154
framesCache.offer(item);
179155
} else {
180156
itemConsumer.accept(item);
181157
}
182158
}
183-
184-
private class ResumeStart implements Runnable {
185-
186-
@Override
187-
public void run() {
188-
doResumeStart();
189-
}
190-
}
191-
192-
private class ResumeComplete implements Runnable {
193-
194-
@Override
195-
public void run() {
196-
doResumeComplete();
197-
}
198-
}
199159
}

0 commit comments

Comments
 (0)