-
Notifications
You must be signed in to change notification settings - Fork 7.6k
OnBackpressureLatest: Non-blocking version of the toBlocking().latest() operator. #2923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
/** | ||
* Copyright 2014 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
package rx.internal.operators; | ||
|
||
import java.util.concurrent.atomic.*; | ||
|
||
import rx.Observable.Operator; | ||
import rx.*; | ||
|
||
/** | ||
* An operator which drops all but the last received value in case the downstream | ||
* doesn't request more. | ||
*/ | ||
public final class OperatorOnBackpressureLatest<T> implements Operator<T, T> { | ||
/** Holds a singleton instance initialized on class-loading. */ | ||
static final class Holder { | ||
static final OperatorOnBackpressureLatest<Object> INSTANCE = new OperatorOnBackpressureLatest<Object>(); | ||
} | ||
|
||
/** | ||
* Returns a singleton instance of the OnBackpressureLatest operator since it is stateless. | ||
* @return the single instanceof OperatorOnBackpressureLatest | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
public static <T> OperatorOnBackpressureLatest<T> instance() { | ||
return (OperatorOnBackpressureLatest<T>)Holder.INSTANCE; | ||
} | ||
|
||
@Override | ||
public Subscriber<? super T> call(Subscriber<? super T> child) { | ||
final LatestEmitter<T> producer = new LatestEmitter<T>(child); | ||
LatestSubscriber<T> parent = new LatestSubscriber<T>(producer); | ||
producer.parent = parent; | ||
child.add(parent); | ||
child.add(producer); | ||
child.setProducer(producer); | ||
return parent; | ||
} | ||
/** | ||
* A terminatable producer which emits the latest items on request. | ||
* @param <T> | ||
*/ | ||
static final class LatestEmitter<T> extends AtomicLong implements Producer, Subscription, Observer<T> { | ||
/** */ | ||
private static final long serialVersionUID = -1364393685005146274L; | ||
final Subscriber<? super T> child; | ||
LatestSubscriber<? super T> parent; | ||
final AtomicReference<Object> value; | ||
/** Written before done, read after done. */ | ||
Throwable terminal; | ||
volatile boolean done; | ||
/** Guarded by this. */ | ||
boolean emitting; | ||
/** Guarded by this. */ | ||
boolean missed; | ||
static final Object EMPTY = new Object(); | ||
static final long NOT_REQUESTED = Long.MIN_VALUE / 2; | ||
public LatestEmitter(Subscriber<? super T> child) { | ||
this.child = child; | ||
this.value = new AtomicReference<Object>(EMPTY); | ||
this.lazySet(NOT_REQUESTED); // not | ||
} | ||
@Override | ||
public void request(long n) { | ||
if (n >= 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why doesn't BackpressureUtils apply to this usage? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we have the intermediate state of NOT_REQUESTED. You did the same in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BackpressureUtils didn't exist at that point, so I am considering how to consolidate this type of logic as we keep repeating this type of non-trivial code and it is easy to get wrong. I'm okay with merging ... we really should spend some time figuring out the core patterns so we can encode the state machine, similar to what BackpressureUtils and AbstractOnSubscribe have started formalizing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use this |
||
for (;;) { | ||
long r = get(); | ||
if (r == Long.MIN_VALUE) { | ||
return; | ||
} | ||
long u; | ||
if (r == NOT_REQUESTED) { | ||
u = n; | ||
} else { | ||
u = r + n; | ||
if (u < 0) { | ||
u = Long.MAX_VALUE; | ||
} | ||
} | ||
if (compareAndSet(r, u)) { | ||
if (r == NOT_REQUESTED) { | ||
parent.requestMore(Long.MAX_VALUE); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So ... we are tracking the requested value for downstream, but upstream we just request MAX_VALUE the first time we receive a request, correct? Is this why BackpressureUtils doesn't work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Partly yes. |
||
} | ||
emit(); | ||
return; | ||
} | ||
} | ||
} | ||
} | ||
long produced(long n) { | ||
for (;;) { | ||
long r = get(); | ||
if (r < 0) { | ||
return r; | ||
} | ||
long u = r - n; | ||
if (compareAndSet(r, u)) { | ||
return u; | ||
} | ||
} | ||
} | ||
@Override | ||
public boolean isUnsubscribed() { | ||
return get() == Long.MIN_VALUE; | ||
} | ||
@Override | ||
public void unsubscribe() { | ||
if (get() >= 0) { | ||
getAndSet(Long.MIN_VALUE); | ||
} | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
value.lazySet(t); // emit's synchronized block does a full release | ||
emit(); | ||
} | ||
@Override | ||
public void onError(Throwable e) { | ||
terminal = e; | ||
done = true; | ||
emit(); | ||
} | ||
@Override | ||
public void onCompleted() { | ||
done = true; | ||
emit(); | ||
} | ||
void emit() { | ||
synchronized (this) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the time Just thinking through the fact that this is a bunch of machinery for something that will typically be put in for dropping data when backpressure happens but should otherwise add negligible overhead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using this operator synchronously does trigger lock elision and won't drop any value thus is not useful. Since the emitter-loop is quite simple and the operator is expected to run asynchronously almost always, this can be changed into a queue-drain if so desired. However, since its purpose is to drop elements because the consumer is slow, I don't really see any point in optimizing the emission further and I think a smaller overhead would just drop more values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The scenario I'm considering is where this operator is put in place to handle occasional bursts where generally the consumer is fast enough, but occasionally it is slow. This is a common use of |
||
if (emitting) { | ||
missed = true; | ||
return; | ||
} | ||
emitting = true; | ||
missed = false; | ||
} | ||
boolean skipFinal = false; | ||
try { | ||
for (;;) { | ||
long r = get(); | ||
if (r == Long.MIN_VALUE) { | ||
skipFinal = true; | ||
break; | ||
} | ||
Object v = value.get(); | ||
if (r > 0 && v != EMPTY) { | ||
@SuppressWarnings("unchecked") | ||
T v2 = (T)v; | ||
child.onNext(v2); | ||
value.compareAndSet(v, EMPTY); | ||
produced(1); | ||
v = EMPTY; | ||
} | ||
if (v == EMPTY && done) { | ||
Throwable e = terminal; | ||
if (e != null) { | ||
child.onError(e); | ||
} else { | ||
child.onCompleted(); | ||
} | ||
} | ||
synchronized (this) { | ||
if (!missed) { | ||
emitting = false; | ||
skipFinal = true; | ||
break; | ||
} | ||
missed = false; | ||
} | ||
} | ||
} finally { | ||
if (!skipFinal) { | ||
synchronized (this) { | ||
emitting = false; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
static final class LatestSubscriber<T> extends Subscriber<T> { | ||
private final LatestEmitter<T> producer; | ||
|
||
private LatestSubscriber(LatestEmitter<T> producer) { | ||
this.producer = producer; | ||
} | ||
|
||
@Override | ||
public void onStart() { | ||
// don't run until the child actually requested to avoid synchronous problems | ||
request(0); | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
producer.onNext(t); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
producer.onError(e); | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
producer.onCompleted(); | ||
} | ||
void requestMore(long n) { | ||
request(n); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence isn't clear to me. I imagine it has something to do with how
request(n)
generally will happen in batches soonNext
emissions will happen in bursts and drop everything but the latest in between in each burst?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that the operator basically pairs requests and emission on a 1:1 basis: if one requests 5, it would behave as if one called request(1) five times and thus giving the window for dropping unawaited values.