Skip to content

2.x: first step switching to the reduced-allocation architecture #4030

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*
* The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?
*/
public class Completable {
public abstract class Completable {
/**
* Callback used for building deferred computations that takes a CompletableSubscriber.
*/
Expand Down Expand Up @@ -386,7 +386,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) {
try {
// TODO plugin wrapping onSubscribe

return new Completable(onSubscribe);
return new CompletableWrapper(onSubscribe);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Expand Down Expand Up @@ -986,18 +986,6 @@ public void dispose() {
});
}

/** The actual subscription action. */
private final CompletableOnSubscribe onSubscribe;

/**
* Constructs a Completable instance with the given onSubscribe callback.
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
* not null (not verified)
*/
protected Completable(CompletableOnSubscribe onSubscribe) {
this.onSubscribe = onSubscribe;
}

/**
* Returns a Completable that emits the a terminated event of either this Completable
* or the other Completable whichever fires first.
Expand Down Expand Up @@ -1929,14 +1917,16 @@ public final void subscribe(CompletableSubscriber s) {
try {
// TODO plugin wrapping the subscriber

onSubscribe.accept(s);
subscribeActual(s);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
}

protected abstract void subscribeActual(CompletableSubscriber s);

/**
* Subscribes to this Completable and calls back either the onError or onComplete functions.
Expand Down
27 changes: 7 additions & 20 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.*;

public class Flowable<T> implements Publisher<T> {
public abstract class Flowable<T> implements Publisher<T> {
/**
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
*
Expand Down Expand Up @@ -385,7 +385,7 @@ public static <T> Flowable<T> concatArray(Publisher<? extends T>... sources) {
public static <T> Flowable<T> create(Publisher<T> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
onSubscribe = RxJavaPlugins.onCreate(onSubscribe);
return new Flowable<T>(onSubscribe);
return fromPublisher(onSubscribe);
}

@BackpressureSupport(BackpressureKind.PASS_THROUGH)
Expand Down Expand Up @@ -497,12 +497,7 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> publish
}
Objects.requireNonNull(publisher, "publisher is null");

return create(new Publisher<T>() {
@Override
public void subscribe(Subscriber<? super T> s) {
publisher.subscribe(s);
}
});
return new FlowableWrapper<T>(publisher);
}

@BackpressureSupport(BackpressureKind.FULL)
Expand Down Expand Up @@ -1160,12 +1155,6 @@ public static <T, R> Flowable<R> zipIterable(Function<? super Object[], ? extend
return create(new PublisherZip<T, R>(null, sources, zipper, bufferSize, delayError));
}

final Publisher<T> onSubscribe;

protected Flowable(Publisher<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerKind.NONE)
public final Flowable<Boolean> all(Predicate<? super T> predicate) {
Expand Down Expand Up @@ -2162,7 +2151,7 @@ public final Flowable<T> last(T defaultValue) {
public final <R> Flowable<R> lift(Operator<? extends R, ? super T> lifter) {
Objects.requireNonNull(lifter, "lifter is null");
// using onSubscribe so the fusing has access to the underlying raw Publisher
return create(new PublisherLift<R, T>(onSubscribe, lifter));
return create(new PublisherLift<R, T>(this, lifter));
}

@BackpressureSupport(BackpressureKind.PASS_THROUGH)
Expand Down Expand Up @@ -2939,18 +2928,14 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
@Override
public final void subscribe(Subscriber<? super T> s) {
Objects.requireNonNull(s, "s is null");
subscribeActual(s);
}

private void subscribeActual(Subscriber<? super T> s) {
try {
s = RxJavaPlugins.onSubscribe(s);

if (s == null) {
throw new NullPointerException("Plugin returned null Subscriber");
}

onSubscribe.subscribe(s);
subscribeActual(s);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Expand All @@ -2965,6 +2950,8 @@ private void subscribeActual(Subscriber<? super T> s) {
}
}

protected abstract void subscribeActual(Subscriber<? super T> s);

@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerKind.CUSTOM)
public final Flowable<T> subscribeOn(Scheduler scheduler) {
Expand Down
21 changes: 10 additions & 11 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Observable for delivering a sequence of values without backpressure.
* @param <T>
*/
public class Observable<T> {
public abstract class Observable<T> {

public interface NbpOnSubscribe<T> extends Consumer<Observer<? super T>> {

Expand Down Expand Up @@ -358,7 +358,7 @@ public static <T> Observable<T> concatArray(Observable<? extends T>... sources)
public static <T> Observable<T> create(NbpOnSubscribe<T> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
// TODO plugin wrapper
return new Observable<T>(onSubscribe);
return new ObservableWrapper<T>(onSubscribe);
}

@SchedulerSupport(SchedulerKind.NONE)
Expand Down Expand Up @@ -1089,12 +1089,6 @@ public static <T, R> Observable<R> zipIterable(Function<? super Object[], ? exte
}


protected final NbpOnSubscribe<T> onSubscribe;

protected Observable(NbpOnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

@SchedulerSupport(SchedulerKind.NONE)
public final Observable<Boolean> all(Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
Expand Down Expand Up @@ -2634,10 +2628,15 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
return ls;
}

public final void subscribe(Observer<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
onSubscribe.accept(subscriber);
public final void subscribe(Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");

// TODO plugin wrappings

subscribeActual(observer);
}

protected abstract void subscribeActual(Observer<? super T> observer);

@SchedulerSupport(SchedulerKind.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*
* @param <T> the value type
*/
public class Single<T> {
public abstract class Single<T> {

public interface SingleOnSubscribe<T> extends Consumer<SingleSubscriber<? super T>> {

Expand Down Expand Up @@ -356,7 +356,7 @@ public static <T> Flowable<T> concat(
public static <T> Single<T> create(SingleOnSubscribe<T> onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
// TODO plugin wrapper
return new Single<T>(onSubscribe);
return new SingleWrapper<T>(onSubscribe);
}

public static <T> Single<T> defer(final Supplier<? extends Single<? extends T>> singleSupplier) {
Expand Down Expand Up @@ -1003,12 +1003,6 @@ public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R>
return Flowable.zipArray(zipper, false, 1, sourcePublishers).toSingle();
}

protected final SingleOnSubscribe<T> onSubscribe;

protected Single(SingleOnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

@SuppressWarnings("unchecked")
public final Single<T> ambWith(Single<? extends T> other) {
Objects.requireNonNull(other, "other is null");
Expand Down Expand Up @@ -1374,7 +1368,7 @@ public void accept(SingleSubscriber<? super R> s) {
throw new NullPointerException("The onLift returned a null subscriber");
}
// TODO plugin wrapper
onSubscribe.accept(sr);
subscribe(sr);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Expand Down Expand Up @@ -1690,9 +1684,11 @@ public void onSuccess(T value) {
public final void subscribe(SingleSubscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
// TODO plugin wrapper
onSubscribe.accept(subscriber);
subscribeActual(subscriber);
}

protected abstract void subscribeActual(SingleSubscriber<? super T> subscriber);

public final void subscribe(Subscriber<? super T> s) {
toFlowable().subscribe(s);
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/io/reactivex/flowables/ConnectableFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
*/
public abstract class ConnectableFlowable<T> extends Flowable<T> {

protected ConnectableFlowable(Publisher<T> onSubscribe) {
super(onSubscribe);
}

/**
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/reactivex/flowables/GroupedFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
*/
package io.reactivex.flowables;

import org.reactivestreams.Publisher;

import io.reactivex.Flowable;

public class GroupedFlowable<K, T> extends Flowable<T> {
public abstract class GroupedFlowable<K, T> extends Flowable<T> {
final K key;
protected GroupedFlowable(Publisher<T> onSubscribe, K key) {
super(onSubscribe);
protected GroupedFlowable(K key) {
this.key = key;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.reactivex.internal.operators.completable;

import io.reactivex.Completable;

public final class CompletableWrapper extends Completable {

final CompletableOnSubscribe onSubscribe;

public CompletableWrapper(CompletableOnSubscribe onSubscribe) {
this.onSubscribe = onSubscribe;
}

@Override
protected void subscribeActual(CompletableSubscriber s) {
onSubscribe.accept(s);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class CachedObservable<T> extends Flowable<T> {
/** The cache and replay state. */
private CacheState<T> state;

private final AtomicBoolean once;
/**
* Creates a cached Observable with a default capacity hint of 16.
* @param <T> the value type
Expand All @@ -55,21 +56,35 @@ public static <T> CachedObservable<T> from(Flowable<? extends T> source, int cap
throw new IllegalArgumentException("capacityHint > 0 required");
}
CacheState<T> state = new CacheState<T>(source, capacityHint);
CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
return new CachedObservable<T>(onSubscribe, state);
return new CachedObservable<T>(state);
}

/**
* Private constructor because state needs to be shared between the Observable body and
* the onSubscribe function.
* @param onSubscribe
* @param state
*/
private CachedObservable(Publisher<T> onSubscribe, CacheState<T> state) {
super(onSubscribe);
private CachedObservable(CacheState<T> state) {
this.state = state;
this.once = new AtomicBoolean();
}

@Override
protected void subscribeActual(Subscriber<? super T> t) {
// we can connect first because we replay everything anyway
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
state.addProducer(rp);

t.onSubscribe(rp);

// we ensure a single connection here to save an instance field of AtomicBoolean in state.
if (!once.get() && once.compareAndSet(false, true)) {
state.connect();
}

// no need to call rp.replay() here because the very first request will trigger it anyway
}

/**
* Check if this cached observable is connected to its source.
* @return true if already connected
Expand Down Expand Up @@ -222,35 +237,6 @@ void dispatch() {
}
}

/**
* Manages the subscription of child subscribers by setting up a replay producer and
* performs auto-connection of the very first subscription.
* @param <T> the value type emitted
*/
static final class CachedSubscribe<T> extends AtomicBoolean implements Publisher<T> {
/** */
private static final long serialVersionUID = -2817751667698696782L;
final CacheState<T> state;
public CachedSubscribe(CacheState<T> state) {
this.state = state;
}
@Override
public void subscribe(Subscriber<? super T> t) {
// we can connect first because we replay everything anyway
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
state.addProducer(rp);

t.onSubscribe(rp);

// we ensure a single connection here to save an instance field of AtomicBoolean in state.
if (!get() && compareAndSet(false, true)) {
state.connect();
}

// no need to call rp.replay() here because the very first request will trigger it anyway
}
}

/**
* Keeps track of the current request amount and the replay position for a child Subscriber.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.reactivex.internal.operators.flowable;

import org.reactivestreams.*;

import io.reactivex.Flowable;

public final class FlowableWrapper<T> extends Flowable<T> {
final Publisher<? extends T> publisher;

public FlowableWrapper(Publisher<? extends T> publisher) {
this.publisher = publisher;
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
publisher.subscribe(s);
}
}
Loading