Skip to content

Commit 3fe5750

Browse files
OlegDokukarobertroeser
authored andcommitted
Performance Optimization Session (#682)
* imporves UnicastMonoProcessor performance by reducing inner MonoProcessor Signed-off-by: Oleh Dokuka <[email protected]> * provides tests and some minor fixes to UnicastMonoProcessor Signed-off-by: Oleh Dokuka <[email protected]> * fixes format Signed-off-by: Oleh Dokuka <[email protected]>
1 parent c6b3d64 commit 3fe5750

File tree

2 files changed

+878
-92
lines changed

2 files changed

+878
-92
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java

Lines changed: 217 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,175 +1,300 @@
11
package io.rsocket.internal;
22

33
import java.util.Objects;
4+
import java.util.concurrent.CancellationException;
45
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
5-
import java.util.stream.Stream;
6+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
67
import org.reactivestreams.Processor;
8+
import org.reactivestreams.Subscriber;
79
import org.reactivestreams.Subscription;
810
import reactor.core.CoreSubscriber;
911
import reactor.core.Disposable;
12+
import reactor.core.Exceptions;
1013
import reactor.core.Scannable;
1114
import reactor.core.publisher.Mono;
12-
import reactor.core.publisher.MonoProcessor;
1315
import reactor.core.publisher.Operators;
1416
import reactor.util.annotation.Nullable;
1517
import reactor.util.context.Context;
16-
import reactor.util.function.Tuple2;
1718

1819
public class UnicastMonoProcessor<O> extends Mono<O>
1920
implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {
2021

22+
/**
23+
* Create a {@link UnicastMonoProcessor} that will eagerly request 1 on {@link
24+
* #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers.
25+
*
26+
* @param <T> type of the expected value
27+
* @return A {@link UnicastMonoProcessor}.
28+
*/
29+
public static <T> UnicastMonoProcessor<T> create() {
30+
return new UnicastMonoProcessor<>();
31+
}
32+
33+
volatile CoreSubscriber<? super O> actual;
34+
35+
@SuppressWarnings("rawtypes")
36+
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL =
37+
AtomicReferenceFieldUpdater.newUpdater(
38+
UnicastMonoProcessor.class, CoreSubscriber.class, "actual");
39+
40+
volatile int once;
41+
2142
@SuppressWarnings("rawtypes")
2243
static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
2344
AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");
2445

25-
private final MonoProcessor<O> processor;
46+
Throwable error;
47+
volatile boolean terminated;
48+
O value;
2649

27-
@SuppressWarnings("unused")
28-
private volatile int once;
50+
volatile Subscription subscription;
51+
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM =
52+
AtomicReferenceFieldUpdater.newUpdater(
53+
UnicastMonoProcessor.class, Subscription.class, "subscription");
2954

30-
private UnicastMonoProcessor() {
31-
this.processor = MonoProcessor.create();
32-
}
55+
@Override
56+
public final void cancel() {
57+
if (isTerminated()) {
58+
return;
59+
}
3360

34-
public static <O> UnicastMonoProcessor<O> create() {
35-
return new UnicastMonoProcessor<>();
36-
}
61+
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
62+
if (s == Operators.cancelledSubscription()) {
63+
return;
64+
}
3765

38-
@Override
39-
public Stream<? extends Scannable> actuals() {
40-
return processor.actuals();
66+
if (s != null) {
67+
s.cancel();
68+
}
4169
}
4270

4371
@Override
44-
public boolean isScanAvailable() {
45-
return processor.isScanAvailable();
46-
}
72+
@SuppressWarnings("unchecked")
73+
public void dispose() {
74+
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
75+
if (s == Operators.cancelledSubscription()) {
76+
return;
77+
}
4778

48-
@Override
49-
public String name() {
50-
return processor.name();
51-
}
79+
final CancellationException e = new CancellationException("Disposed");
80+
error = e;
81+
value = null;
82+
terminated = true;
83+
if (s != null) {
84+
s.cancel();
85+
}
5286

53-
@Override
54-
public String stepName() {
55-
return processor.stepName();
87+
final CoreSubscriber<? super O> a = this.actual;
88+
ACTUAL.lazySet(this, null);
89+
if (a != null) {
90+
a.onError(e);
91+
}
5692
}
5793

58-
@Override
59-
public Stream<String> steps() {
60-
return processor.steps();
94+
/**
95+
* Return the produced {@link Throwable} error if any or null
96+
*
97+
* @return the produced {@link Throwable} error if any or null
98+
*/
99+
@Nullable
100+
public final Throwable getError() {
101+
return isTerminated() ? error : null;
61102
}
62103

63-
@Override
64-
public Stream<? extends Scannable> parents() {
65-
return processor.parents();
104+
/**
105+
* Indicates whether this {@code UnicastMonoProcessor} has been interrupted via cancellation.
106+
*
107+
* @return {@code true} if this {@code UnicastMonoProcessor} is cancelled, {@code false}
108+
* otherwise.
109+
*/
110+
public boolean isCancelled() {
111+
return isDisposed() && !isTerminated();
66112
}
67113

68-
@Override
69-
@Nullable
70-
public <T> T scan(Attr<T> key) {
71-
return processor.scan(key);
114+
/**
115+
* Indicates whether this {@code UnicastMonoProcessor} has been completed with an error.
116+
*
117+
* @return {@code true} if this {@code UnicastMonoProcessor} was completed with an error, {@code
118+
* false} otherwise.
119+
*/
120+
public final boolean isError() {
121+
return getError() != null;
72122
}
73123

74-
@Override
75-
public <T> T scanOrDefault(Attr<T> key, T defaultValue) {
76-
return processor.scanOrDefault(key, defaultValue);
124+
/**
125+
* Indicates whether this {@code UnicastMonoProcessor} has been terminated by the source producer
126+
* with a success or an error.
127+
*
128+
* @return {@code true} if this {@code UnicastMonoProcessor} is successful, {@code false}
129+
* otherwise.
130+
*/
131+
public final boolean isTerminated() {
132+
return terminated;
77133
}
78134

79135
@Override
80-
public Stream<Tuple2<String, String>> tags() {
81-
return processor.tags();
136+
public boolean isDisposed() {
137+
return subscription == Operators.cancelledSubscription();
82138
}
83139

84140
@Override
85-
public void onSubscribe(Subscription s) {
86-
processor.onSubscribe(s);
141+
public final void onComplete() {
142+
onNext(null);
87143
}
88144

89145
@Override
90-
public void onNext(O o) {
91-
processor.onNext(o);
92-
}
146+
@SuppressWarnings("unchecked")
147+
public final void onError(Throwable cause) {
148+
Objects.requireNonNull(cause, "onError cannot be null");
149+
150+
if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription())
151+
== Operators.cancelledSubscription()) {
152+
Operators.onErrorDropped(cause, currentContext());
153+
return;
154+
}
93155

94-
@Override
95-
public void onError(Throwable t) {
96-
processor.onError(t);
97-
}
156+
error = cause;
157+
value = null;
158+
terminated = true;
98159

99-
@Nullable
100-
public Throwable getError() {
101-
return processor.getError();
160+
final CoreSubscriber<? super O> a = actual;
161+
ACTUAL.lazySet(this, null);
162+
if (a != null) {
163+
a.onError(cause);
164+
}
102165
}
103166

104-
public boolean isCancelled() {
105-
return processor.isCancelled();
106-
}
167+
@Override
168+
@SuppressWarnings("unchecked")
169+
public final void onNext(@Nullable O value) {
170+
final Subscription s;
171+
if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()))
172+
== Operators.cancelledSubscription()) {
173+
if (value != null) {
174+
Operators.onNextDropped(value, currentContext());
175+
}
176+
return;
177+
}
107178

108-
public boolean isError() {
109-
return processor.isError();
110-
}
179+
this.value = value;
180+
terminated = true;
111181

112-
public boolean isSuccess() {
113-
return processor.isSuccess();
182+
final CoreSubscriber<? super O> a = actual;
183+
ACTUAL.lazySet(this, null);
184+
if (value == null) {
185+
if (a != null) {
186+
a.onComplete();
187+
}
188+
} else {
189+
if (s != null) {
190+
s.cancel();
191+
}
192+
193+
if (a != null) {
194+
a.onNext(value);
195+
a.onComplete();
196+
}
197+
}
114198
}
115199

116-
public boolean isTerminated() {
117-
return processor.isTerminated();
200+
@Override
201+
public final void onSubscribe(Subscription subscription) {
202+
if (Operators.setOnce(UPSTREAM, this, subscription)) {
203+
subscription.request(Long.MAX_VALUE);
204+
}
118205
}
119206

207+
/**
208+
* Returns the value that completed this {@link UnicastMonoProcessor}. Returns {@code null} if the
209+
* {@link UnicastMonoProcessor} has not been completed. If the {@link UnicastMonoProcessor} is
210+
* completed with an error a RuntimeException that wraps the error is thrown.
211+
*
212+
* @return the value that completed the {@link UnicastMonoProcessor}, or {@code null} if it has
213+
* not been completed
214+
* @throws RuntimeException if the {@link UnicastMonoProcessor} was completed with an error
215+
*/
120216
@Nullable
121217
public O peek() {
122-
return processor.peek();
123-
}
124-
125-
public long downstreamCount() {
126-
return processor.downstreamCount();
127-
}
128-
129-
public boolean hasDownstreams() {
130-
return processor.hasDownstreams();
131-
}
218+
if (!isTerminated()) {
219+
return null;
220+
}
132221

133-
@Override
134-
public void onComplete() {
135-
processor.onComplete();
136-
}
222+
if (value != null) {
223+
return value;
224+
}
137225

138-
@Override
139-
public void request(long n) {
140-
processor.request(n);
141-
}
226+
if (error != null) {
227+
RuntimeException re = Exceptions.propagate(error);
228+
re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error"));
229+
throw re;
230+
}
142231

143-
@Override
144-
public void cancel() {
145-
processor.cancel();
232+
return null;
146233
}
147234

148235
@Override
149-
public void dispose() {
150-
processor.dispose();
236+
public final void request(long n) {
237+
Operators.validate(n);
151238
}
152239

153240
@Override
154241
public Context currentContext() {
155-
return processor.currentContext();
242+
final CoreSubscriber<? super O> a = this.actual;
243+
return a != null ? a.currentContext() : Context.empty();
156244
}
157245

158246
@Override
159-
public boolean isDisposed() {
160-
return processor.isDisposed();
247+
@Nullable
248+
public Object scanUnsafe(Attr key) {
249+
// touch guard
250+
boolean c = isCancelled();
251+
252+
if (key == Attr.TERMINATED) {
253+
return isTerminated();
254+
}
255+
if (key == Attr.PARENT) {
256+
return subscription;
257+
}
258+
if (key == Attr.ERROR) {
259+
return error;
260+
}
261+
if (key == Attr.PREFETCH) {
262+
return Integer.MAX_VALUE;
263+
}
264+
if (key == Attr.CANCELLED) {
265+
return c;
266+
}
267+
return null;
161268
}
162269

163-
@Override
164-
public Object scanUnsafe(Attr key) {
165-
return processor.scanUnsafe(key);
270+
/**
271+
* Return true if any {@link Subscriber} is actively subscribed
272+
*
273+
* @return true if any {@link Subscriber} is actively subscribed
274+
*/
275+
public final boolean hasDownstream() {
276+
return actual != null;
166277
}
167278

168279
@Override
169280
public void subscribe(CoreSubscriber<? super O> actual) {
170281
Objects.requireNonNull(actual, "subscribe");
171282
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
172-
processor.subscribe(actual);
283+
actual.onSubscribe(this);
284+
ACTUAL.lazySet(this, actual);
285+
if (isTerminated()) {
286+
Throwable ex = error;
287+
if (ex != null) {
288+
actual.onError(ex);
289+
} else {
290+
O v = value;
291+
if (v != null) {
292+
actual.onNext(v);
293+
}
294+
actual.onComplete();
295+
}
296+
ACTUAL.lazySet(this, null);
297+
}
173298
} else {
174299
Operators.error(
175300
actual,

0 commit comments

Comments
 (0)