Skip to content

Commit ea8c56c

Browse files
committed
imporves UnicastMonoProcessor performance by reducing inner MonoProcessor
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 88ee909 commit ea8c56c

File tree

1 file changed

+223
-91
lines changed

1 file changed

+223
-91
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java

Lines changed: 223 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,175 +1,307 @@
11
package io.rsocket.internal;
22

33
import java.util.Objects;
4+
import java.util.concurrent.CancellationException;
45
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
5-
import java.util.stream.Stream;
6+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
67
import org.reactivestreams.Processor;
8+
import org.reactivestreams.Publisher;
9+
import org.reactivestreams.Subscriber;
710
import org.reactivestreams.Subscription;
811
import reactor.core.CoreSubscriber;
912
import reactor.core.Disposable;
13+
import reactor.core.Exceptions;
1014
import reactor.core.Scannable;
1115
import reactor.core.publisher.Mono;
1216
import reactor.core.publisher.MonoProcessor;
1317
import reactor.core.publisher.Operators;
1418
import reactor.util.annotation.Nullable;
1519
import reactor.util.context.Context;
16-
import reactor.util.function.Tuple2;
1720

1821
public class UnicastMonoProcessor<O> extends Mono<O>
1922
implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {
2023

24+
/**
25+
* Create a {@link MonoProcessor} that will eagerly request 1 on {@link
26+
* #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers.
27+
*
28+
* @param <T> type of the expected value
29+
* @return A {@link MonoProcessor}.
30+
*/
31+
public static <T> UnicastMonoProcessor<T> create() {
32+
return new UnicastMonoProcessor<>();
33+
}
34+
35+
volatile CoreSubscriber<? super O> actual;
36+
37+
@SuppressWarnings("rawtypes")
38+
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL =
39+
AtomicReferenceFieldUpdater.newUpdater(
40+
UnicastMonoProcessor.class, CoreSubscriber.class, "actual");
41+
42+
volatile int once;
43+
2144
@SuppressWarnings("rawtypes")
2245
static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
2346
AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");
2447

25-
private final MonoProcessor<O> processor;
48+
Publisher<? extends O> source;
2649

27-
@SuppressWarnings("unused")
28-
private volatile int once;
29-
30-
private UnicastMonoProcessor() {
31-
this.processor = MonoProcessor.create();
32-
}
50+
Throwable error;
51+
volatile boolean terminated;
52+
O value;
3353

34-
public static <O> UnicastMonoProcessor<O> create() {
35-
return new UnicastMonoProcessor<>();
36-
}
54+
volatile Subscription subscription;
55+
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM =
56+
AtomicReferenceFieldUpdater.newUpdater(
57+
UnicastMonoProcessor.class, Subscription.class, "subscription");
3758

3859
@Override
39-
public Stream<? extends Scannable> actuals() {
40-
return processor.actuals();
41-
}
60+
public final void cancel() {
61+
if (isTerminated()) {
62+
return;
63+
}
4264

43-
@Override
44-
public boolean isScanAvailable() {
45-
return processor.isScanAvailable();
46-
}
65+
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
66+
if (s == Operators.cancelledSubscription()) {
67+
return;
68+
}
4769

48-
@Override
49-
public String name() {
50-
return processor.name();
70+
source = null;
71+
if (s != null) {
72+
s.cancel();
73+
}
5174
}
5275

5376
@Override
54-
public String stepName() {
55-
return processor.stepName();
56-
}
77+
@SuppressWarnings("unchecked")
78+
public void dispose() {
79+
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
80+
if (s == Operators.cancelledSubscription()) {
81+
return;
82+
}
5783

58-
@Override
59-
public Stream<String> steps() {
60-
return processor.steps();
61-
}
84+
final CancellationException e = new CancellationException("Disposed");
85+
error = e;
86+
value = null;
87+
source = null;
88+
terminated = true;
89+
if (s != null) {
90+
s.cancel();
91+
}
6292

63-
@Override
64-
public Stream<? extends Scannable> parents() {
65-
return processor.parents();
93+
final CoreSubscriber<? super O> a = this.actual;
94+
ACTUAL.lazySet(this, null);
95+
if (a != null) {
96+
a.onError(e);
97+
}
6698
}
6799

68-
@Override
100+
/**
101+
* Return the produced {@link Throwable} error if any or null
102+
*
103+
* @return the produced {@link Throwable} error if any or null
104+
*/
69105
@Nullable
70-
public <T> T scan(Attr<T> key) {
71-
return processor.scan(key);
106+
public final Throwable getError() {
107+
return isTerminated() ? error : null;
72108
}
73109

74-
@Override
75-
public <T> T scanOrDefault(Attr<T> key, T defaultValue) {
76-
return processor.scanOrDefault(key, defaultValue);
110+
/**
111+
* Indicates whether this {@code MonoProcessor} has been interrupted via cancellation.
112+
*
113+
* @return {@code true} if this {@code MonoProcessor} is cancelled, {@code false} otherwise.
114+
*/
115+
public boolean isCancelled() {
116+
return isDisposed() && !isTerminated();
77117
}
78118

79-
@Override
80-
public Stream<Tuple2<String, String>> tags() {
81-
return processor.tags();
119+
/**
120+
* Indicates whether this {@code MonoProcessor} has been completed with an error.
121+
*
122+
* @return {@code true} if this {@code MonoProcessor} was completed with an error, {@code false}
123+
* otherwise.
124+
*/
125+
public final boolean isError() {
126+
return getError() != null;
82127
}
83128

84-
@Override
85-
public void onSubscribe(Subscription s) {
86-
processor.onSubscribe(s);
129+
/**
130+
* Indicates whether this {@code MonoProcessor} has been terminated by the source producer with a
131+
* success or an error.
132+
*
133+
* @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise.
134+
*/
135+
public final boolean isTerminated() {
136+
return terminated;
87137
}
88138

89139
@Override
90-
public void onNext(O o) {
91-
processor.onNext(o);
140+
public boolean isDisposed() {
141+
return subscription == Operators.cancelledSubscription();
92142
}
93143

94144
@Override
95-
public void onError(Throwable t) {
96-
processor.onError(t);
145+
public final void onComplete() {
146+
onNext(null);
97147
}
98148

99-
@Nullable
100-
public Throwable getError() {
101-
return processor.getError();
102-
}
149+
@Override
150+
@SuppressWarnings("unchecked")
151+
public final void onError(Throwable cause) {
152+
Objects.requireNonNull(cause, "onError cannot be null");
153+
154+
if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription())
155+
== Operators.cancelledSubscription()) {
156+
Operators.onErrorDropped(cause, currentContext());
157+
return;
158+
}
103159

104-
public boolean isCancelled() {
105-
return processor.isCancelled();
106-
}
160+
error = cause;
161+
value = null;
162+
source = null;
163+
terminated = true;
107164

108-
public boolean isError() {
109-
return processor.isError();
165+
final CoreSubscriber<? super O> a = actual;
166+
ACTUAL.lazySet(this, null);
167+
if (a != null) {
168+
a.onError(cause);
169+
}
110170
}
111171

112-
public boolean isSuccess() {
113-
return processor.isSuccess();
172+
@Override
173+
@SuppressWarnings("unchecked")
174+
public final void onNext(@Nullable O value) {
175+
final Subscription s;
176+
if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()))
177+
== Operators.cancelledSubscription()) {
178+
if (value != null) {
179+
Operators.onNextDropped(value, currentContext());
180+
}
181+
return;
182+
}
183+
184+
this.value = value;
185+
final Publisher<? extends O> parent = source;
186+
source = null;
187+
terminated = true;
188+
189+
final CoreSubscriber<? super O> a = actual;
190+
ACTUAL.lazySet(this, null);
191+
if (value == null) {
192+
if (a != null) {
193+
a.onComplete();
194+
}
195+
} else {
196+
if (s != null && !(parent instanceof Mono)) {
197+
s.cancel();
198+
}
199+
200+
if (a != null) {
201+
a.onNext(value);
202+
a.onComplete();
203+
}
204+
}
114205
}
115206

116-
public boolean isTerminated() {
117-
return processor.isTerminated();
207+
@Override
208+
public final void onSubscribe(Subscription subscription) {
209+
if (Operators.setOnce(UPSTREAM, this, subscription)) {
210+
subscription.request(Long.MAX_VALUE);
211+
}
118212
}
119213

214+
/**
215+
* Returns the value that completed this {@link MonoProcessor}. Returns {@code null} if the {@link
216+
* MonoProcessor} has not been completed. If the {@link MonoProcessor} is completed with an error
217+
* a RuntimeException that wraps the error is thrown.
218+
*
219+
* @return the value that completed the {@link MonoProcessor}, or {@code null} if it has not been
220+
* completed
221+
* @throws RuntimeException if the {@link MonoProcessor} was completed with an error
222+
*/
120223
@Nullable
121224
public O peek() {
122-
return processor.peek();
123-
}
124-
125-
public long downstreamCount() {
126-
return processor.downstreamCount();
127-
}
128-
129-
public boolean hasDownstreams() {
130-
return processor.hasDownstreams();
131-
}
225+
if (!isTerminated()) {
226+
return null;
227+
}
132228

133-
@Override
134-
public void onComplete() {
135-
processor.onComplete();
136-
}
229+
if (value != null) {
230+
return value;
231+
}
137232

138-
@Override
139-
public void request(long n) {
140-
processor.request(n);
141-
}
233+
if (error != null) {
234+
RuntimeException re = Exceptions.propagate(error);
235+
re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error"));
236+
throw re;
237+
}
142238

143-
@Override
144-
public void cancel() {
145-
processor.cancel();
239+
return null;
146240
}
147241

148242
@Override
149-
public void dispose() {
150-
processor.dispose();
243+
public final void request(long n) {
244+
Operators.validate(n);
151245
}
152246

153247
@Override
154248
public Context currentContext() {
155-
return processor.currentContext();
249+
final CoreSubscriber<? super O> a = this.actual;
250+
return a != null ? a.currentContext() : Context.empty();
156251
}
157252

158253
@Override
159-
public boolean isDisposed() {
160-
return processor.isDisposed();
254+
@Nullable
255+
public Object scanUnsafe(Attr key) {
256+
// touch guard
257+
boolean c = isCancelled();
258+
259+
if (key == Attr.TERMINATED) {
260+
return isTerminated();
261+
}
262+
if (key == Attr.PARENT) {
263+
return subscription;
264+
}
265+
if (key == Attr.ERROR) {
266+
return error;
267+
}
268+
if (key == Attr.PREFETCH) {
269+
return Integer.MAX_VALUE;
270+
}
271+
if (key == Attr.CANCELLED) {
272+
return c;
273+
}
274+
return null;
161275
}
162276

163-
@Override
164-
public Object scanUnsafe(Attr key) {
165-
return processor.scanUnsafe(key);
277+
/**
278+
* Return true if any {@link Subscriber} is actively subscribed
279+
*
280+
* @return true if any {@link Subscriber} is actively subscribed
281+
*/
282+
public final boolean hasDownstream() {
283+
return actual != null;
166284
}
167285

168286
@Override
169287
public void subscribe(CoreSubscriber<? super O> actual) {
170288
Objects.requireNonNull(actual, "subscribe");
171289
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
172-
processor.subscribe(actual);
290+
actual.onSubscribe(this);
291+
ACTUAL.lazySet(this, actual);
292+
if (isTerminated()) {
293+
Throwable ex = error;
294+
if (ex != null) {
295+
actual.onError(ex);
296+
} else {
297+
O v = value;
298+
if (v != null) {
299+
actual.onNext(v);
300+
}
301+
actual.onComplete();
302+
}
303+
ACTUAL.lazySet(this, null);
304+
}
173305
} else {
174306
Operators.error(
175307
actual,

0 commit comments

Comments
 (0)