Skip to content

Commit a6f9547

Browse files
committed
improves LocalDuplexConnection (#onClose notification + ByteBufs releases)
At the moment, the onClose hook has no "wait until cleaned" logic, which leads to unpredicted behaviors when used with resumability or others scenarios where we need to wait until all the queues are cleaned and there are no other resources in use (e.g. ByteBufs). For that porpuse, this commit adds onFinalizeHook to the UnboundedProcessor so we can now listen when the UnboundedProcessor is finalized and only after that send the onClose signal Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0283373 commit a6f9547

File tree

3 files changed

+37
-18
lines changed

3 files changed

+37
-18
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public final class UnboundedProcessor extends FluxProcessor<ByteBuf, ByteBuf>
4545

4646
final Queue<ByteBuf> queue;
4747
final Queue<ByteBuf> priorityQueue;
48+
final Runnable onFinalizedHook;
4849

4950
boolean cancelled;
5051
boolean done;
@@ -88,6 +89,11 @@ public final class UnboundedProcessor extends FluxProcessor<ByteBuf, ByteBuf>
8889
boolean outputFused;
8990

9091
public UnboundedProcessor() {
92+
this(() -> {});
93+
}
94+
95+
public UnboundedProcessor(Runnable onFinalizedHook) {
96+
this.onFinalizedHook = onFinalizedHook;
9197
this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
9298
this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE);
9399
}
@@ -793,6 +799,9 @@ static long markTerminatedOrFinalized(UnboundedProcessor instance) {
793799
}
794800

795801
if (STATE.compareAndSet(instance, state, nextState | FLAG_TERMINATED)) {
802+
if (isFinalized(nextState)) {
803+
instance.onFinalizedHook.run();
804+
}
796805
return state;
797806
}
798807
}
@@ -906,6 +915,7 @@ static void clearAndFinalize(UnboundedProcessor instance) {
906915

907916
if (STATE.compareAndSet(
908917
instance, state, (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED)) {
918+
instance.onFinalizedHook.run();
909919
break;
910920
}
911921
}

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,17 @@ public Mono<DuplexConnection> connect() {
7777
return Mono.error(new IllegalArgumentException("Could not find server: " + name));
7878
}
7979

80-
UnboundedProcessor in = new UnboundedProcessor();
81-
UnboundedProcessor out = new UnboundedProcessor();
82-
Sinks.Empty<Void> closeSink = Sinks.empty();
80+
Sinks.One<Object> inSink = Sinks.one();
81+
Sinks.One<Object> outSink = Sinks.one();
82+
UnboundedProcessor in = new UnboundedProcessor(() -> inSink.tryEmitValue(inSink));
83+
UnboundedProcessor out = new UnboundedProcessor(() -> outSink.tryEmitValue(outSink));
8384

84-
server.apply(new LocalDuplexConnection(name, allocator, out, in, closeSink)).subscribe();
85+
Mono<Void> onClose = inSink.asMono().zipWith(outSink.asMono()).then();
8586

86-
return Mono.just(
87-
(DuplexConnection) new LocalDuplexConnection(name, allocator, in, out, closeSink));
87+
server.apply(new LocalDuplexConnection(name, allocator, out, in, onClose)).subscribe();
88+
89+
return Mono.<DuplexConnection>just(
90+
new LocalDuplexConnection(name, allocator, in, out, onClose));
8891
});
8992
}
9093
}

rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
import org.reactivestreams.Subscription;
2828
import reactor.core.CoreSubscriber;
2929
import reactor.core.Fuseable;
30-
import reactor.core.Scannable;
3130
import reactor.core.publisher.Flux;
3231
import reactor.core.publisher.Mono;
3332
import reactor.core.publisher.Operators;
34-
import reactor.core.publisher.Sinks;
3533

3634
/** An implementation of {@link DuplexConnection} that connects inside the same JVM. */
3735
final class LocalDuplexConnection implements DuplexConnection {
@@ -40,7 +38,7 @@ final class LocalDuplexConnection implements DuplexConnection {
4038
private final ByteBufAllocator allocator;
4139
private final Flux<ByteBuf> in;
4240

43-
private final Sinks.Empty<Void> onClose;
41+
private final Mono<Void> onClose;
4442

4543
private final UnboundedProcessor out;
4644

@@ -58,7 +56,7 @@ final class LocalDuplexConnection implements DuplexConnection {
5856
ByteBufAllocator allocator,
5957
Flux<ByteBuf> in,
6058
UnboundedProcessor out,
61-
Sinks.Empty<Void> onClose) {
59+
Mono<Void> onClose) {
6260
this.address = new LocalSocketAddress(name);
6361
this.allocator = Objects.requireNonNull(allocator, "allocator must not be null");
6462
this.in = Objects.requireNonNull(in, "in must not be null");
@@ -69,24 +67,23 @@ final class LocalDuplexConnection implements DuplexConnection {
6967
@Override
7068
public void dispose() {
7169
out.onComplete();
72-
onClose.tryEmitEmpty();
7370
}
7471

7572
@Override
76-
@SuppressWarnings("ConstantConditions")
7773
public boolean isDisposed() {
78-
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
74+
return out.isDisposed();
7975
}
8076

8177
@Override
8278
public Mono<Void> onClose() {
83-
return onClose.asMono();
79+
return onClose;
8480
}
8581

8682
@Override
8783
public Flux<ByteBuf> receive() {
8884
return in.transform(
89-
Operators.<ByteBuf, ByteBuf>lift((__, actual) -> new ByteBufReleaserOperator(actual)));
85+
Operators.<ByteBuf, ByteBuf>lift(
86+
(__, actual) -> new ByteBufReleaserOperator(actual, this)));
9087
}
9188

9289
@Override
@@ -119,11 +116,14 @@ static class ByteBufReleaserOperator
119116
implements CoreSubscriber<ByteBuf>, Subscription, Fuseable.QueueSubscription<ByteBuf> {
120117

121118
final CoreSubscriber<? super ByteBuf> actual;
119+
final LocalDuplexConnection parent;
122120

123121
Subscription s;
124122

125-
public ByteBufReleaserOperator(CoreSubscriber<? super ByteBuf> actual) {
123+
public ByteBufReleaserOperator(
124+
CoreSubscriber<? super ByteBuf> actual, LocalDuplexConnection parent) {
126125
this.actual = actual;
126+
this.parent = parent;
127127
}
128128

129129
@Override
@@ -136,17 +136,22 @@ public void onSubscribe(Subscription s) {
136136

137137
@Override
138138
public void onNext(ByteBuf buf) {
139-
actual.onNext(buf);
140-
buf.release();
139+
try {
140+
actual.onNext(buf);
141+
} finally {
142+
buf.release();
143+
}
141144
}
142145

143146
@Override
144147
public void onError(Throwable t) {
148+
parent.out.onError(t);
145149
actual.onError(t);
146150
}
147151

148152
@Override
149153
public void onComplete() {
154+
parent.out.onComplete();
150155
actual.onComplete();
151156
}
152157

@@ -158,6 +163,7 @@ public void request(long n) {
158163
@Override
159164
public void cancel() {
160165
s.cancel();
166+
parent.out.onComplete();
161167
}
162168

163169
@Override

0 commit comments

Comments
 (0)