Skip to content

Performance Optimization Session #682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 217 additions & 92 deletions rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java
Original file line number Diff line number Diff line change
@@ -1,175 +1,300 @@
package io.rsocket.internal;

import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;

public class UnicastMonoProcessor<O> extends Mono<O>
implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {

/**
* Create a {@link UnicastMonoProcessor} that will eagerly request 1 on {@link
* #onSubscribe(Subscription)}, cache and emit the eventual result for 1 or N subscribers.
*
* @param <T> type of the expected value
* @return A {@link UnicastMonoProcessor}.
*/
public static <T> UnicastMonoProcessor<T> create() {
return new UnicastMonoProcessor<>();
}

volatile CoreSubscriber<? super O> actual;

@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, CoreSubscriber> ACTUAL =
AtomicReferenceFieldUpdater.newUpdater(
UnicastMonoProcessor.class, CoreSubscriber.class, "actual");

volatile int once;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");

private final MonoProcessor<O> processor;
Throwable error;
volatile boolean terminated;
O value;

@SuppressWarnings("unused")
private volatile int once;
volatile Subscription subscription;
static final AtomicReferenceFieldUpdater<UnicastMonoProcessor, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(
UnicastMonoProcessor.class, Subscription.class, "subscription");

private UnicastMonoProcessor() {
this.processor = MonoProcessor.create();
}
@Override
public final void cancel() {
if (isTerminated()) {
return;
}

public static <O> UnicastMonoProcessor<O> create() {
return new UnicastMonoProcessor<>();
}
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
return;
}

@Override
public Stream<? extends Scannable> actuals() {
return processor.actuals();
if (s != null) {
s.cancel();
}
}

@Override
public boolean isScanAvailable() {
return processor.isScanAvailable();
}
@SuppressWarnings("unchecked")
public void dispose() {
final Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
return;
}

@Override
public String name() {
return processor.name();
}
final CancellationException e = new CancellationException("Disposed");
error = e;
value = null;
terminated = true;
if (s != null) {
s.cancel();
}

@Override
public String stepName() {
return processor.stepName();
final CoreSubscriber<? super O> a = this.actual;
ACTUAL.lazySet(this, null);
if (a != null) {
a.onError(e);
}
}

@Override
public Stream<String> steps() {
return processor.steps();
/**
* Return the produced {@link Throwable} error if any or null
*
* @return the produced {@link Throwable} error if any or null
*/
@Nullable
public final Throwable getError() {
return isTerminated() ? error : null;
}

@Override
public Stream<? extends Scannable> parents() {
return processor.parents();
/**
* Indicates whether this {@code UnicastMonoProcessor} has been interrupted via cancellation.
*
* @return {@code true} if this {@code UnicastMonoProcessor} is cancelled, {@code false}
* otherwise.
*/
public boolean isCancelled() {
return isDisposed() && !isTerminated();
}

@Override
@Nullable
public <T> T scan(Attr<T> key) {
return processor.scan(key);
/**
* Indicates whether this {@code UnicastMonoProcessor} has been completed with an error.
*
* @return {@code true} if this {@code UnicastMonoProcessor} was completed with an error, {@code
* false} otherwise.
*/
public final boolean isError() {
return getError() != null;
}

@Override
public <T> T scanOrDefault(Attr<T> key, T defaultValue) {
return processor.scanOrDefault(key, defaultValue);
/**
* Indicates whether this {@code UnicastMonoProcessor} has been terminated by the source producer
* with a success or an error.
*
* @return {@code true} if this {@code UnicastMonoProcessor} is successful, {@code false}
* otherwise.
*/
public final boolean isTerminated() {
return terminated;
}

@Override
public Stream<Tuple2<String, String>> tags() {
return processor.tags();
public boolean isDisposed() {
return subscription == Operators.cancelledSubscription();
}

@Override
public void onSubscribe(Subscription s) {
processor.onSubscribe(s);
public final void onComplete() {
onNext(null);
}

@Override
public void onNext(O o) {
processor.onNext(o);
}
@SuppressWarnings("unchecked")
public final void onError(Throwable cause) {
Objects.requireNonNull(cause, "onError cannot be null");

if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription())
== Operators.cancelledSubscription()) {
Operators.onErrorDropped(cause, currentContext());
return;
}

@Override
public void onError(Throwable t) {
processor.onError(t);
}
error = cause;
value = null;
terminated = true;

@Nullable
public Throwable getError() {
return processor.getError();
final CoreSubscriber<? super O> a = actual;
ACTUAL.lazySet(this, null);
if (a != null) {
a.onError(cause);
}
}

public boolean isCancelled() {
return processor.isCancelled();
}
@Override
@SuppressWarnings("unchecked")
public final void onNext(@Nullable O value) {
final Subscription s;
if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()))
== Operators.cancelledSubscription()) {
if (value != null) {
Operators.onNextDropped(value, currentContext());
}
return;
}

public boolean isError() {
return processor.isError();
}
this.value = value;
terminated = true;

public boolean isSuccess() {
return processor.isSuccess();
final CoreSubscriber<? super O> a = actual;
ACTUAL.lazySet(this, null);
if (value == null) {
if (a != null) {
a.onComplete();
}
} else {
if (s != null) {
s.cancel();
}

if (a != null) {
a.onNext(value);
a.onComplete();
}
}
}

public boolean isTerminated() {
return processor.isTerminated();
@Override
public final void onSubscribe(Subscription subscription) {
if (Operators.setOnce(UPSTREAM, this, subscription)) {
subscription.request(Long.MAX_VALUE);
}
}

/**
* Returns the value that completed this {@link UnicastMonoProcessor}. Returns {@code null} if the
* {@link UnicastMonoProcessor} has not been completed. If the {@link UnicastMonoProcessor} is
* completed with an error a RuntimeException that wraps the error is thrown.
*
* @return the value that completed the {@link UnicastMonoProcessor}, or {@code null} if it has
* not been completed
* @throws RuntimeException if the {@link UnicastMonoProcessor} was completed with an error
*/
@Nullable
public O peek() {
return processor.peek();
}

public long downstreamCount() {
return processor.downstreamCount();
}

public boolean hasDownstreams() {
return processor.hasDownstreams();
}
if (!isTerminated()) {
return null;
}

@Override
public void onComplete() {
processor.onComplete();
}
if (value != null) {
return value;
}

@Override
public void request(long n) {
processor.request(n);
}
if (error != null) {
RuntimeException re = Exceptions.propagate(error);
re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error"));
throw re;
}

@Override
public void cancel() {
processor.cancel();
return null;
}

@Override
public void dispose() {
processor.dispose();
public final void request(long n) {
Operators.validate(n);
}

@Override
public Context currentContext() {
return processor.currentContext();
final CoreSubscriber<? super O> a = this.actual;
return a != null ? a.currentContext() : Context.empty();
}

@Override
public boolean isDisposed() {
return processor.isDisposed();
@Nullable
public Object scanUnsafe(Attr key) {
// touch guard
boolean c = isCancelled();

if (key == Attr.TERMINATED) {
return isTerminated();
}
if (key == Attr.PARENT) {
return subscription;
}
if (key == Attr.ERROR) {
return error;
}
if (key == Attr.PREFETCH) {
return Integer.MAX_VALUE;
}
if (key == Attr.CANCELLED) {
return c;
}
return null;
}

@Override
public Object scanUnsafe(Attr key) {
return processor.scanUnsafe(key);
/**
* Return true if any {@link Subscriber} is actively subscribed
*
* @return true if any {@link Subscriber} is actively subscribed
*/
public final boolean hasDownstream() {
return actual != null;
}

@Override
public void subscribe(CoreSubscriber<? super O> actual) {
Objects.requireNonNull(actual, "subscribe");
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
processor.subscribe(actual);
actual.onSubscribe(this);
ACTUAL.lazySet(this, actual);
if (isTerminated()) {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
} else {
O v = value;
if (v != null) {
actual.onNext(v);
}
actual.onComplete();
}
ACTUAL.lazySet(this, null);
}
} else {
Operators.error(
actual,
Expand Down
Loading