Skip to content

1.x: optimize collect, reduce and takeLast(1) #4176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4737,15 +4737,13 @@ public final <R> Observable<R> cast(final Class<R> klass) {
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
*/
public final <R> Observable<R> collect(Func0<R> stateFactory, final Action2<R, ? super T> collector) {
Func2<R, T, R> accumulator = InternalObservableUtils.createCollectorCaller(collector);

/*
* Discussion and confirmation of implementation at
* https://github.com/ReactiveX/RxJava/issues/423#issuecomment-27642532
*
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
*/
return lift(new OperatorScan<R, T>(stateFactory, accumulator)).last();
return create(new OnSubscribeCollect<T, R>(this, stateFactory, collector));
}

/**
Expand Down Expand Up @@ -7899,7 +7897,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
*
* It should use last() not takeLast(1) since it needs to emit an error if the sequence is empty.
*/
return scan(accumulator).last();
return create(new OnSubscribeReduce<T>(this, accumulator));
}

/**
Expand Down Expand Up @@ -7947,7 +7945,7 @@ public final Observable<T> reduce(Func2<T, T, T> accumulator) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Uncyclopedia: Fold (higher-order function)</a>
*/
public final <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
return scan(initialValue, accumulator).takeLast(1);
return create(new OnSubscribeReduceSeed<T, R>(this, initialValue, accumulator));
}

/**
Expand Down Expand Up @@ -10154,7 +10152,7 @@ public final Observable<T> takeLast(final int count) {
if (count == 0) {
return ignoreElements();
} else if (count == 1) {
return lift(OperatorTakeLastOne.<T>instance());
return create(new OnSubscribeTakeLastOne<T>(this));
} else {
return lift(new OperatorTakeLast<T>(count));
}
Expand Down
177 changes: 177 additions & 0 deletions src/main/java/rx/internal/operators/DeferredScalarSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import rx.*;

/**
* Base class for Subscribers that consume the entire upstream and signal
* zero or one element (or an error) in a backpressure honoring fashion.
* <p>
* Store any temporary value in {@link #value} and indicate there is
* a value available when completing by setting {@link #hasValue}.
* <p.
* Use {@link #subscribeTo(Observable)} to properly setup the link between this and the downstream
* subscriber.
*
* @param <T> the source value type
* @param <R> the result value type
*/
public abstract class DeferredScalarSubscriber<T, R> extends Subscriber<T> {

/** The downstream subscriber. */
protected final Subscriber<? super R> actual;

/** Indicates there is a value available in value. */
protected boolean hasValue;

/** The holder of the single value. */
protected R value;

/** The state, see the constants below. */
final AtomicInteger state;

/** Initial state. */
static final int NO_REQUEST_NO_VALUE = 0;
/** Request came first. */
static final int HAS_REQUEST_NO_VALUE = 1;
/** Value came first. */
static final int NO_REQUEST_HAS_VALUE = 2;
/** Value will be emitted. */
static final int HAS_REQUEST_HAS_VALUE = 3;

public DeferredScalarSubscriber(Subscriber<? super R> actual) {
this.actual = actual;
this.state = new AtomicInteger();
}

@Override
public void onError(Throwable ex) {
value = null;
actual.onError(ex);
}

@Override
public void onCompleted() {
if (hasValue) {
complete(value);
} else {
complete();
}
}

/**
* Signals onCompleted() to the downstream subscriber.
*/
protected final void complete() {
actual.onCompleted();
}

/**
* Atomically switches to the terminal state and emits the value if
* there is a request for it or stores it for retrieval by {@link #downstreamRequest(long)}.
* @param value the value to complete with
*/
protected final void complete(R value) {
Subscriber<? super R> a = actual;
for (;;) {
int s = state.get();

if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
return;
}
if (s == HAS_REQUEST_NO_VALUE) {
a.onNext(value);
if (!a.isUnsubscribed()) {
a.onCompleted();
}
state.lazySet(HAS_REQUEST_HAS_VALUE);
return;
}
this.value = value;
if (state.compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
return;
}
}
}

final void downstreamRequest(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n != 0L) {
Subscriber<? super R> a = actual;
for (;;) {
int s = state.get();
if (s == HAS_REQUEST_NO_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
return;
}
if (s == NO_REQUEST_HAS_VALUE) {
if (state.compareAndSet(NO_REQUEST_HAS_VALUE, HAS_REQUEST_HAS_VALUE)) {
a.onNext(value);
if (!a.isUnsubscribed()) {
a.onCompleted();
}
}
return;
}
if (state.compareAndSet(NO_REQUEST_NO_VALUE, HAS_REQUEST_NO_VALUE)) {
return;
}
}
}
}

@Override
public final void setProducer(Producer p) {
p.request(Long.MAX_VALUE);
}

/**
* Links up with the downstream Subscriber (cancellation, backpressure) and
* subscribes to the source Observable.
* @param source the source Observable
*/
public final void subscribeTo(Observable<? extends T> source) {
setupDownstream();
source.unsafeSubscribe(this);
}

/* test */ final void setupDownstream() {
Subscriber<? super R> a = actual;
a.add(this);
a.setProducer(new InnerProducer(this));
}

/**
* Redirects the downstream request amount bach to the DeferredScalarSubscriber.
*/
static final class InnerProducer implements Producer {
final DeferredScalarSubscriber<?, ?> parent;

public InnerProducer(DeferredScalarSubscriber<?, ?> parent) {
this.parent = parent;
}

@Override
public void request(long n) {
parent.downstreamRequest(n);
}
}
}
76 changes: 76 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeCollect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package rx.internal.operators;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
import rx.functions.*;

public final class OnSubscribeCollect<T, R> implements OnSubscribe<R> {

final Observable<T> source;

final Func0<R> collectionFactory;

final Action2<R, ? super T> collector;

public OnSubscribeCollect(Observable<T> source, Func0<R> collectionFactory, Action2<R, ? super T> collector) {
this.source = source;
this.collectionFactory = collectionFactory;
this.collector = collector;
}

@Override
public void call(Subscriber<? super R> t) {
R initialValue;

try {
initialValue = collectionFactory.call();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
t.onError(ex);
return;
}

new CollectSubscriber<T, R>(t, initialValue, collector).subscribeTo(source);
}

static final class CollectSubscriber<T, R> extends DeferredScalarSubscriber<T, R> {

final Action2<R, ? super T> collector;

public CollectSubscriber(Subscriber<? super R> actual, R initialValue, Action2<R, ? super T> collector) {
super(actual);
this.value = initialValue;
this.hasValue = true;
this.collector = collector;
}

@Override
public void onNext(T t) {
try {
collector.call(value, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
actual.onError(ex);
}
}

}
}
Loading