Skip to content

OnBackpressureLatest: Non-blocking version of the toBlocking().latest() operator. #2923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5367,6 +5367,27 @@ public final Observable<T> onBackpressureBlock() {
return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE);
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to
* hold onto the latest value and emit that on request.
* <p>
* Its behavior is logically equivalent to toBlocking().latest() with the exception that
* the downstream is not blocking while requesting more values.
* <p>
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
* and doesn't propagate any backpressure requests from downstream.
* <p>
* Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence isn't clear to me. I imagine it has something to do with how request(n) generally will happen in batches so onNext emissions will happen in bursts and drop everything but the latest in between in each burst?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that the operator basically pairs requests and emission on a 1:1 basis: if one requests 5, it would behave as if one called request(1) five times and thus giving the window for dropping unawaited values.

* requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events.
* @return
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Observable<T> onBackpressureLatest() {
return lift(OperatorOnBackpressureLatest.<T>instance());
}

/**
* Instructs an Observable to pass control to another Observable rather than invoking
* {@link Observer#onError onError} if it encounters an error.
Expand Down
225 changes: 225 additions & 0 deletions src/main/java/rx/internal/operators/OperatorOnBackpressureLatest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import java.util.concurrent.atomic.*;

import rx.Observable.Operator;
import rx.*;

/**
* An operator which drops all but the last received value in case the downstream
* doesn't request more.
*/
public final class OperatorOnBackpressureLatest<T> implements Operator<T, T> {
/** Holds a singleton instance initialized on class-loading. */
static final class Holder {
static final OperatorOnBackpressureLatest<Object> INSTANCE = new OperatorOnBackpressureLatest<Object>();
}

/**
* Returns a singleton instance of the OnBackpressureLatest operator since it is stateless.
* @return the single instanceof OperatorOnBackpressureLatest
*/
@SuppressWarnings("unchecked")
public static <T> OperatorOnBackpressureLatest<T> instance() {
return (OperatorOnBackpressureLatest<T>)Holder.INSTANCE;
}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final LatestEmitter<T> producer = new LatestEmitter<T>(child);
LatestSubscriber<T> parent = new LatestSubscriber<T>(producer);
producer.parent = parent;
child.add(parent);
child.add(producer);
child.setProducer(producer);
return parent;
}
/**
* A terminatable producer which emits the latest items on request.
* @param <T>
*/
static final class LatestEmitter<T> extends AtomicLong implements Producer, Subscription, Observer<T> {
/** */
private static final long serialVersionUID = -1364393685005146274L;
final Subscriber<? super T> child;
LatestSubscriber<? super T> parent;
final AtomicReference<Object> value;
/** Written before done, read after done. */
Throwable terminal;
volatile boolean done;
/** Guarded by this. */
boolean emitting;
/** Guarded by this. */
boolean missed;
static final Object EMPTY = new Object();
static final long NOT_REQUESTED = Long.MIN_VALUE / 2;
public LatestEmitter(Subscriber<? super T> child) {
this.child = child;
this.value = new AtomicReference<Object>(EMPTY);
this.lazySet(NOT_REQUESTED); // not
}
@Override
public void request(long n) {
if (n >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't BackpressureUtils apply to this usage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have the intermediate state of NOT_REQUESTED. You did the same in Subscriber.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BackpressureUtils didn't exist at that point, so I am considering how to consolidate this type of logic as we keep repeating this type of non-trivial code and it is easy to get wrong.

I'm okay with merging ... we really should spend some time figuring out the core patterns so we can encode the state machine, similar to what BackpressureUtils and AbstractOnSubscribe have started formalizing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this NOT_REQUESTED only in few places (such as in OperatorPublish) and I'm not yet confident the this request state value exhibits the same behavior and response action every time it needs to be evaluated.

for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u;
if (r == NOT_REQUESTED) {
u = n;
} else {
u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
}
if (compareAndSet(r, u)) {
if (r == NOT_REQUESTED) {
parent.requestMore(Long.MAX_VALUE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ... we are tracking the requested value for downstream, but upstream we just request MAX_VALUE the first time we receive a request, correct? Is this why BackpressureUtils doesn't work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partly yes.

}
emit();
return;
}
}
}
}
long produced(long n) {
for (;;) {
long r = get();
if (r < 0) {
return r;
}
long u = r - n;
if (compareAndSet(r, u)) {
return u;
}
}
}
@Override
public boolean isUnsubscribed() {
return get() == Long.MIN_VALUE;
}
@Override
public void unsubscribe() {
if (get() >= 0) {
getAndSet(Long.MIN_VALUE);
}
}

@Override
public void onNext(T t) {
value.lazySet(t); // emit's synchronized block does a full release
emit();
}
@Override
public void onError(Throwable e) {
terminal = e;
done = true;
emit();
}
@Override
public void onCompleted() {
done = true;
emit();
}
void emit() {
synchronized (this) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time onNext would emit without contention if the consumer is fast. Does lock elision remove the performance impediment of a synchronized on every single onNext attempt?

Just thinking through the fact that this is a bunch of machinery for something that will typically be put in for dropping data when backpressure happens but should otherwise add negligible overhead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this operator synchronously does trigger lock elision and won't drop any value thus is not useful. Since the emitter-loop is quite simple and the operator is expected to run asynchronously almost always, this can be changed into a queue-drain if so desired. However, since its purpose is to drop elements because the consumer is slow, I don't really see any point in optimizing the emission further and I think a smaller overhead would just drop more values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any point in optimizing the emission further and I think a smaller overhead would just drop more values.

The scenario I'm considering is where this operator is put in place to handle occasional bursts where generally the consumer is fast enough, but occasionally it is slow. This is a common use of onBackpressureDrop for us in our stream processor. We would use this in the same way.

if (emitting) {
missed = true;
return;
}
emitting = true;
missed = false;
}
boolean skipFinal = false;
try {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
skipFinal = true;
break;
}
Object v = value.get();
if (r > 0 && v != EMPTY) {
@SuppressWarnings("unchecked")
T v2 = (T)v;
child.onNext(v2);
value.compareAndSet(v, EMPTY);
produced(1);
v = EMPTY;
}
if (v == EMPTY && done) {
Throwable e = terminal;
if (e != null) {
child.onError(e);
} else {
child.onCompleted();
}
}
synchronized (this) {
if (!missed) {
emitting = false;
skipFinal = true;
break;
}
missed = false;
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
}
static final class LatestSubscriber<T> extends Subscriber<T> {
private final LatestEmitter<T> producer;

private LatestSubscriber(LatestEmitter<T> producer) {
this.producer = producer;
}

@Override
public void onStart() {
// don't run until the child actually requested to avoid synchronous problems
request(0);
}

@Override
public void onNext(T t) {
producer.onNext(t);
}

@Override
public void onError(Throwable e) {
producer.onError(e);
}

@Override
public void onCompleted() {
producer.onCompleted();
}
void requestMore(long n) {
request(n);
}
}
}
Loading