|
1 | 1 | package io.rsocket.internal;
|
2 | 2 |
|
3 | 3 | import java.util.Objects;
|
4 |
| -import java.util.concurrent.CancellationException; |
5 | 4 | import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
6 |
| -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| 5 | +import java.util.stream.Stream; |
7 | 6 | import org.reactivestreams.Processor;
|
8 |
| -import org.reactivestreams.Subscriber; |
9 | 7 | import org.reactivestreams.Subscription;
|
10 | 8 | import reactor.core.CoreSubscriber;
|
11 | 9 | import reactor.core.Disposable;
|
12 |
| -import reactor.core.Exceptions; |
13 | 10 | import reactor.core.Scannable;
|
14 | 11 | import reactor.core.publisher.Mono;
|
| 12 | +import reactor.core.publisher.MonoProcessor; |
15 | 13 | import reactor.core.publisher.Operators;
|
16 | 14 | import reactor.util.annotation.Nullable;
|
17 | 15 | import reactor.util.context.Context;
|
| 16 | +import reactor.util.function.Tuple2; |
18 | 17 |
|
19 | 18 | public class UnicastMonoProcessor<O> extends Mono<O>
|
20 | 19 | implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {
|
21 | 20 |
|
22 |
| - /** |
23 |
| - * Create a {@link UnicastMonoProcessor} that will eagerly request 1 on {@link |
24 |
| - * #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers. |
25 |
| - * |
26 |
| - * @param <T> type of the expected value |
27 |
| - * @return A {@link UnicastMonoProcessor}. |
28 |
| - */ |
29 |
| - public static <T> UnicastMonoProcessor<T> create() { |
30 |
| - return new UnicastMonoProcessor<>(); |
31 |
| - } |
32 |
| - |
33 |
| - volatile CoreSubscriber<? super O> actual; |
34 |
| - |
35 |
| - @SuppressWarnings("rawtypes") |
36 |
| - static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL = |
37 |
| - AtomicReferenceFieldUpdater.newUpdater( |
38 |
| - UnicastMonoProcessor.class, CoreSubscriber.class, "actual"); |
39 |
| - |
40 |
| - volatile int once; |
41 |
| - |
42 | 21 | @SuppressWarnings("rawtypes")
|
43 | 22 | static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
|
44 | 23 | AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");
|
45 | 24 |
|
46 |
| - Throwable error; |
47 |
| - volatile boolean terminated; |
48 |
| - O value; |
| 25 | + private final MonoProcessor<O> processor; |
49 | 26 |
|
50 |
| - volatile Subscription subscription; |
51 |
| - static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM = |
52 |
| - AtomicReferenceFieldUpdater.newUpdater( |
53 |
| - UnicastMonoProcessor.class, Subscription.class, "subscription"); |
| 27 | + @SuppressWarnings("unused") |
| 28 | + private volatile int once; |
54 | 29 |
|
55 |
| - @Override |
56 |
| - public final void cancel() { |
57 |
| - if (isTerminated()) { |
58 |
| - return; |
59 |
| - } |
| 30 | + private UnicastMonoProcessor() { |
| 31 | + this.processor = MonoProcessor.create(); |
| 32 | + } |
60 | 33 |
|
61 |
| - final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
62 |
| - if (s == Operators.cancelledSubscription()) { |
63 |
| - return; |
64 |
| - } |
| 34 | + public static <O> UnicastMonoProcessor<O> create() { |
| 35 | + return new UnicastMonoProcessor<>(); |
| 36 | + } |
65 | 37 |
|
66 |
| - if (s != null) { |
67 |
| - s.cancel(); |
68 |
| - } |
| 38 | + @Override |
| 39 | + public Stream<? extends Scannable> actuals() { |
| 40 | + return processor.actuals(); |
69 | 41 | }
|
70 | 42 |
|
71 | 43 | @Override
|
72 |
| - @SuppressWarnings("unchecked") |
73 |
| - public void dispose() { |
74 |
| - final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); |
75 |
| - if (s == Operators.cancelledSubscription()) { |
76 |
| - return; |
77 |
| - } |
| 44 | + public boolean isScanAvailable() { |
| 45 | + return processor.isScanAvailable(); |
| 46 | + } |
78 | 47 |
|
79 |
| - final CancellationException e = new CancellationException("Disposed"); |
80 |
| - error = e; |
81 |
| - value = null; |
82 |
| - terminated = true; |
83 |
| - if (s != null) { |
84 |
| - s.cancel(); |
85 |
| - } |
| 48 | + @Override |
| 49 | + public String name() { |
| 50 | + return processor.name(); |
| 51 | + } |
86 | 52 |
|
87 |
| - final CoreSubscriber<? super O> a = this.actual; |
88 |
| - ACTUAL.lazySet(this, null); |
89 |
| - if (a != null) { |
90 |
| - a.onError(e); |
91 |
| - } |
| 53 | + @Override |
| 54 | + public String stepName() { |
| 55 | + return processor.stepName(); |
92 | 56 | }
|
93 | 57 |
|
94 |
| - /** |
95 |
| - * Return the produced {@link Throwable} error if any or null |
96 |
| - * |
97 |
| - * @return the produced {@link Throwable} error if any or null |
98 |
| - */ |
99 |
| - @Nullable |
100 |
| - public final Throwable getError() { |
101 |
| - return isTerminated() ? error : null; |
| 58 | + @Override |
| 59 | + public Stream<String> steps() { |
| 60 | + return processor.steps(); |
102 | 61 | }
|
103 | 62 |
|
104 |
| - /** |
105 |
| - * Indicates whether this {@code UnicastMonoProcessor} has been interrupted via cancellation. |
106 |
| - * |
107 |
| - * @return {@code true} if this {@code UnicastMonoProcessor} is cancelled, {@code false} |
108 |
| - * otherwise. |
109 |
| - */ |
110 |
| - public boolean isCancelled() { |
111 |
| - return isDisposed() && !isTerminated(); |
| 63 | + @Override |
| 64 | + public Stream<? extends Scannable> parents() { |
| 65 | + return processor.parents(); |
112 | 66 | }
|
113 | 67 |
|
114 |
| - /** |
115 |
| - * Indicates whether this {@code UnicastMonoProcessor} has been completed with an error. |
116 |
| - * |
117 |
| - * @return {@code true} if this {@code UnicastMonoProcessor} was completed with an error, {@code |
118 |
| - * false} otherwise. |
119 |
| - */ |
120 |
| - public final boolean isError() { |
121 |
| - return getError() != null; |
| 68 | + @Override |
| 69 | + @Nullable |
| 70 | + public <T> T scan(Attr<T> key) { |
| 71 | + return processor.scan(key); |
122 | 72 | }
|
123 | 73 |
|
124 |
| - /** |
125 |
| - * Indicates whether this {@code UnicastMonoProcessor} has been terminated by the source producer |
126 |
| - * with a success or an error. |
127 |
| - * |
128 |
| - * @return {@code true} if this {@code UnicastMonoProcessor} is successful, {@code false} |
129 |
| - * otherwise. |
130 |
| - */ |
131 |
| - public final boolean isTerminated() { |
132 |
| - return terminated; |
| 74 | + @Override |
| 75 | + public <T> T scanOrDefault(Attr<T> key, T defaultValue) { |
| 76 | + return processor.scanOrDefault(key, defaultValue); |
133 | 77 | }
|
134 | 78 |
|
135 | 79 | @Override
|
136 |
| - public boolean isDisposed() { |
137 |
| - return subscription == Operators.cancelledSubscription(); |
| 80 | + public Stream<Tuple2<String, String>> tags() { |
| 81 | + return processor.tags(); |
138 | 82 | }
|
139 | 83 |
|
140 | 84 | @Override
|
141 |
| - public final void onComplete() { |
142 |
| - onNext(null); |
| 85 | + public void onSubscribe(Subscription s) { |
| 86 | + processor.onSubscribe(s); |
143 | 87 | }
|
144 | 88 |
|
145 | 89 | @Override
|
146 |
| - @SuppressWarnings("unchecked") |
147 |
| - public final void onError(Throwable cause) { |
148 |
| - Objects.requireNonNull(cause, "onError cannot be null"); |
149 |
| - |
150 |
| - if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) |
151 |
| - == Operators.cancelledSubscription()) { |
152 |
| - Operators.onErrorDropped(cause, currentContext()); |
153 |
| - return; |
154 |
| - } |
| 90 | + public void onNext(O o) { |
| 91 | + processor.onNext(o); |
| 92 | + } |
155 | 93 |
|
156 |
| - error = cause; |
157 |
| - value = null; |
158 |
| - terminated = true; |
| 94 | + @Override |
| 95 | + public void onError(Throwable t) { |
| 96 | + processor.onError(t); |
| 97 | + } |
159 | 98 |
|
160 |
| - final CoreSubscriber<? super O> a = actual; |
161 |
| - ACTUAL.lazySet(this, null); |
162 |
| - if (a != null) { |
163 |
| - a.onError(cause); |
164 |
| - } |
| 99 | + @Nullable |
| 100 | + public Throwable getError() { |
| 101 | + return processor.getError(); |
165 | 102 | }
|
166 | 103 |
|
167 |
| - @Override |
168 |
| - @SuppressWarnings("unchecked") |
169 |
| - public final void onNext(@Nullable O value) { |
170 |
| - final Subscription s; |
171 |
| - if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) |
172 |
| - == Operators.cancelledSubscription()) { |
173 |
| - if (value != null) { |
174 |
| - Operators.onNextDropped(value, currentContext()); |
175 |
| - } |
176 |
| - return; |
177 |
| - } |
| 104 | + public boolean isCancelled() { |
| 105 | + return processor.isCancelled(); |
| 106 | + } |
178 | 107 |
|
179 |
| - this.value = value; |
180 |
| - terminated = true; |
| 108 | + public boolean isError() { |
| 109 | + return processor.isError(); |
| 110 | + } |
181 | 111 |
|
182 |
| - final CoreSubscriber<? super O> a = actual; |
183 |
| - ACTUAL.lazySet(this, null); |
184 |
| - if (value == null) { |
185 |
| - if (a != null) { |
186 |
| - a.onComplete(); |
187 |
| - } |
188 |
| - } else { |
189 |
| - if (s != null) { |
190 |
| - s.cancel(); |
191 |
| - } |
192 |
| - |
193 |
| - if (a != null) { |
194 |
| - a.onNext(value); |
195 |
| - a.onComplete(); |
196 |
| - } |
197 |
| - } |
| 112 | + public boolean isSuccess() { |
| 113 | + return processor.isSuccess(); |
198 | 114 | }
|
199 | 115 |
|
200 |
| - @Override |
201 |
| - public final void onSubscribe(Subscription subscription) { |
202 |
| - if (Operators.setOnce(UPSTREAM, this, subscription)) { |
203 |
| - subscription.request(Long.MAX_VALUE); |
204 |
| - } |
| 116 | + public boolean isTerminated() { |
| 117 | + return processor.isTerminated(); |
205 | 118 | }
|
206 | 119 |
|
207 |
| - /** |
208 |
| - * Returns the value that completed this {@link UnicastMonoProcessor}. Returns {@code null} if the |
209 |
| - * {@link UnicastMonoProcessor} has not been completed. If the {@link UnicastMonoProcessor} is |
210 |
| - * completed with an error a RuntimeException that wraps the error is thrown. |
211 |
| - * |
212 |
| - * @return the value that completed the {@link UnicastMonoProcessor}, or {@code null} if it has |
213 |
| - * not been completed |
214 |
| - * @throws RuntimeException if the {@link UnicastMonoProcessor} was completed with an error |
215 |
| - */ |
216 | 120 | @Nullable
|
217 | 121 | public O peek() {
|
218 |
| - if (!isTerminated()) { |
219 |
| - return null; |
220 |
| - } |
| 122 | + return processor.peek(); |
| 123 | + } |
221 | 124 |
|
222 |
| - if (value != null) { |
223 |
| - return value; |
224 |
| - } |
| 125 | + public long downstreamCount() { |
| 126 | + return processor.downstreamCount(); |
| 127 | + } |
225 | 128 |
|
226 |
| - if (error != null) { |
227 |
| - RuntimeException re = Exceptions.propagate(error); |
228 |
| - re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error")); |
229 |
| - throw re; |
230 |
| - } |
| 129 | + public boolean hasDownstreams() { |
| 130 | + return processor.hasDownstreams(); |
| 131 | + } |
231 | 132 |
|
232 |
| - return null; |
| 133 | + @Override |
| 134 | + public void onComplete() { |
| 135 | + processor.onComplete(); |
233 | 136 | }
|
234 | 137 |
|
235 | 138 | @Override
|
236 |
| - public final void request(long n) { |
237 |
| - Operators.validate(n); |
| 139 | + public void request(long n) { |
| 140 | + processor.request(n); |
238 | 141 | }
|
239 | 142 |
|
240 | 143 | @Override
|
241 |
| - public Context currentContext() { |
242 |
| - final CoreSubscriber<? super O> a = this.actual; |
243 |
| - return a != null ? a.currentContext() : Context.empty(); |
| 144 | + public void cancel() { |
| 145 | + processor.cancel(); |
244 | 146 | }
|
245 | 147 |
|
246 | 148 | @Override
|
247 |
| - @Nullable |
248 |
| - public Object scanUnsafe(Attr key) { |
249 |
| - // touch guard |
250 |
| - boolean c = isCancelled(); |
| 149 | + public void dispose() { |
| 150 | + processor.dispose(); |
| 151 | + } |
251 | 152 |
|
252 |
| - if (key == Attr.TERMINATED) { |
253 |
| - return isTerminated(); |
254 |
| - } |
255 |
| - if (key == Attr.PARENT) { |
256 |
| - return subscription; |
257 |
| - } |
258 |
| - if (key == Attr.ERROR) { |
259 |
| - return error; |
260 |
| - } |
261 |
| - if (key == Attr.PREFETCH) { |
262 |
| - return Integer.MAX_VALUE; |
263 |
| - } |
264 |
| - if (key == Attr.CANCELLED) { |
265 |
| - return c; |
266 |
| - } |
267 |
| - return null; |
| 153 | + @Override |
| 154 | + public Context currentContext() { |
| 155 | + return processor.currentContext(); |
| 156 | + } |
| 157 | + |
| 158 | + @Override |
| 159 | + public boolean isDisposed() { |
| 160 | + return processor.isDisposed(); |
268 | 161 | }
|
269 | 162 |
|
270 |
| - /** |
271 |
| - * Return true if any {@link Subscriber} is actively subscribed |
272 |
| - * |
273 |
| - * @return true if any {@link Subscriber} is actively subscribed |
274 |
| - */ |
275 |
| - public final boolean hasDownstream() { |
276 |
| - return actual != null; |
| 163 | + @Override |
| 164 | + public Object scanUnsafe(Attr key) { |
| 165 | + return processor.scanUnsafe(key); |
277 | 166 | }
|
278 | 167 |
|
279 | 168 | @Override
|
280 | 169 | public void subscribe(CoreSubscriber<? super O> actual) {
|
281 | 170 | Objects.requireNonNull(actual, "subscribe");
|
282 | 171 | if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
|
283 |
| - actual.onSubscribe(this); |
284 |
| - ACTUAL.lazySet(this, actual); |
285 |
| - if (isTerminated()) { |
286 |
| - Throwable ex = error; |
287 |
| - if (ex != null) { |
288 |
| - actual.onError(ex); |
289 |
| - } else { |
290 |
| - O v = value; |
291 |
| - if (v != null) { |
292 |
| - actual.onNext(v); |
293 |
| - } |
294 |
| - actual.onComplete(); |
295 |
| - } |
296 |
| - ACTUAL.lazySet(this, null); |
297 |
| - } |
| 172 | + processor.subscribe(actual); |
298 | 173 | } else {
|
299 | 174 | Operators.error(
|
300 | 175 | actual,
|
|
0 commit comments