Skip to content

2.x: Update Observable's ops to work with ObservableConsumable #4041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
482 changes: 247 additions & 235 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public T value() {
return value;
}

public <U> ObservableConsumable<U> scalarFlatMap(final Function<? super T, ? extends Observable<? extends U>> mapper) {
public <U> ObservableConsumable<U> scalarFlatMap(final Function<? super T, ? extends ObservableConsumable<? extends U>> mapper) {
return new ObservableConsumable<U>() {
@Override
public void subscribe(Observer<? super U> s) {
Observable<? extends U> other;
ObservableConsumable<? extends U> other;
try {
other = mapper.apply(value);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class NbpOnSubscribeAmb<T> implements ObservableConsumable<T> {
final Observable<? extends T>[] sources;
final Iterable<? extends Observable<? extends T>> sourcesIterable;
final ObservableConsumable<? extends T>[] sources;
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;

public NbpOnSubscribeAmb(Observable<? extends T>[] sources, Iterable<? extends Observable<? extends T>> sourcesIterable) {
public NbpOnSubscribeAmb(ObservableConsumable<? extends T>[] sources, Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}

@Override
@SuppressWarnings("unchecked")
public void subscribe(Observer<? super T> s) {
Observable<? extends T>[] sources = this.sources;
ObservableConsumable<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (Observable<? extends T> p : sourcesIterable) {
for (ObservableConsumable<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Expand Down Expand Up @@ -74,7 +74,7 @@ public AmbCoordinator(Observer<? super T> actual, int count) {
this.subscribers = new AmbInnerSubscriber[count];
}

public void subscribe(Observable<? extends T>[] sources) {
public void subscribe(ObservableConsumable<? extends T>[] sources) {
AmbInnerSubscriber<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class NbpOnSubscribeCombineLatest<T, R> implements ObservableConsumable<R> {
final Observable<? extends T>[] sources;
final Iterable<? extends Observable<? extends T>> sourcesIterable;
final ObservableConsumable<? extends T>[] sources;
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;

public NbpOnSubscribeCombineLatest(Observable<? extends T>[] sources,
Iterable<? extends Observable<? extends T>> sourcesIterable,
public NbpOnSubscribeCombineLatest(ObservableConsumable<? extends T>[] sources,
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> combiner, int bufferSize,
boolean delayError) {
this.sources = sources;
Expand All @@ -49,11 +49,11 @@ public NbpOnSubscribeCombineLatest(Observable<? extends T>[] sources,
@Override
@SuppressWarnings("unchecked")
public void subscribe(Observer<? super R> s) {
Observable<? extends T>[] sources = this.sources;
ObservableConsumable<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (Observable<? extends T> p : sourcesIterable) {
for (ObservableConsumable<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Expand Down Expand Up @@ -109,7 +109,7 @@ public LatestCoordinator(Observer<? super R> actual,
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
}

public void subscribe(Observable<? extends T>[] sources) {
public void subscribe(ObservableConsumable<? extends T>[] sources) {
Observer<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
import io.reactivex.internal.disposables.EmptyDisposable;

public final class NbpOnSubscribeDefer<T> implements ObservableConsumable<T> {
final Supplier<? extends Observable<? extends T>> supplier;
public NbpOnSubscribeDefer(Supplier<? extends Observable<? extends T>> supplier) {
final Supplier<? extends ObservableConsumable<? extends T>> supplier;
public NbpOnSubscribeDefer(Supplier<? extends ObservableConsumable<? extends T>> supplier) {
this.supplier = supplier;
}
@Override
public void subscribe(Observer<? super T> s) {
Observable<? extends T> pub;
ObservableConsumable<? extends T> pub;
try {
pub = supplier.get();
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
* @param <U> the other value type, ignored
*/
public final class NbpOnSubscribeDelaySubscriptionOther<T, U> implements ObservableConsumable<T> {
final Observable<? extends T> main;
final Observable<U> other;
final ObservableConsumable<? extends T> main;
final ObservableConsumable<U> other;

public NbpOnSubscribeDelaySubscriptionOther(Observable<? extends T> main, Observable<U> other) {
public NbpOnSubscribeDelaySubscriptionOther(ObservableConsumable<? extends T> main, ObservableConsumable<U> other) {
this.main = main;
this.other = other;
}
Expand Down Expand Up @@ -66,7 +66,7 @@ public void onComplete() {
}
done = true;

main.unsafeSubscribe(new Observer<T>() {
main.subscribe(new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
serial.set(d);
Expand All @@ -90,6 +90,6 @@ public void onComplete() {
}
};

other.unsafeSubscribe(otherSubscriber);
other.subscribe(otherSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.Observable.NbpOperator;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand All @@ -26,13 +25,13 @@
* @param <T> the upstream value type
* @param <R> the downstream parameter type
*/
public final class NbpOnSubscribeLift<R, T> implements ObservableConsumable<R> {
public final class NbpOnSubscribeLift<R, T> extends Observable<R> {
/** The actual operator. */
final NbpOperator<? extends R, ? super T> operator;
/** The source publisher. */
final Observable<? extends T> source;
final ObservableConsumable<? extends T> source;

public NbpOnSubscribeLift(Observable<? extends T> source, NbpOperator<? extends R, ? super T> operator) {
public NbpOnSubscribeLift(ObservableConsumable<? extends T> source, NbpOperator<? extends R, ? super T> operator) {
this.source = source;
this.operator = operator;
}
Expand All @@ -49,12 +48,12 @@ public NbpOnSubscribeLift(Observable<? extends T> source, NbpOperator<? extends
* Returns the source of this lift publisher.
* @return the source of this lift publisher
*/
public Observable<? extends T> source() {
public ObservableConsumable<? extends T> source() {
return source;
}

@Override
public void subscribe(Observer<? super R> s) {
public void subscribeActual(Observer<? super R> s) {
try {
if (s == null) {
throw new NullPointerException("Operator " + operator + " received a null Subscriber");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import io.reactivex.subjects.BehaviorSubject;

public final class NbpOnSubscribeRedo<T> implements ObservableConsumable<T> {
final Observable<? extends T> source;
final Function<? super Observable<Try<Optional<Object>>>, ? extends Observable<?>> manager;
final ObservableConsumable<? extends T> source;
final Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableConsumable<?>> manager;

public NbpOnSubscribeRedo(Observable<? extends T> source,
Function<? super Observable<Try<Optional<Object>>>, ? extends Observable<?>> manager) {
public NbpOnSubscribeRedo(ObservableConsumable<? extends T> source,
Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableConsumable<?>> manager) {
this.source = source;
this.manager = manager;
}
Expand All @@ -41,7 +41,7 @@ public void subscribe(Observer<? super T> s) {

s.onSubscribe(parent.arbiter);

Observable<?> action = manager.apply(subject);
ObservableConsumable<?> action = manager.apply(subject);

action.subscribe(new NbpToNotificationSubscriber<Object>(new Consumer<Try<Optional<Object>>>() {
@Override
Expand All @@ -59,12 +59,12 @@ static final class RedoSubscriber<T> extends AtomicBoolean implements Observer<T
private static final long serialVersionUID = -1151903143112844287L;
final Observer<? super T> actual;
final BehaviorSubject<Try<Optional<Object>>> subject;
final Observable<? extends T> source;
final ObservableConsumable<? extends T> source;
final MultipleAssignmentDisposable arbiter;

final AtomicInteger wip = new AtomicInteger();

public RedoSubscriber(Observer<? super T> actual, BehaviorSubject<Try<Optional<Object>>> subject, Observable<? extends T> source) {
public RedoSubscriber(Observer<? super T> actual, BehaviorSubject<Try<Optional<Object>>> subject, ObservableConsumable<? extends T> source) {
this.actual = actual;
this.subject = subject;
this.source = source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import io.reactivex.internal.queue.SpscLinkedArrayQueue;

public final class NbpOnSubscribeSequenceEqual<T> implements ObservableConsumable<Boolean> {
final Observable<? extends T> first;
final Observable<? extends T> second;
final ObservableConsumable<? extends T> first;
final ObservableConsumable<? extends T> second;
final BiPredicate<? super T, ? super T> comparer;
final int bufferSize;

public NbpOnSubscribeSequenceEqual(Observable<? extends T> first, Observable<? extends T> second,
public NbpOnSubscribeSequenceEqual(ObservableConsumable<? extends T> first, ObservableConsumable<? extends T> second,
BiPredicate<? super T, ? super T> comparer, int bufferSize) {
this.first = first;
this.second = second;
Expand All @@ -48,14 +48,14 @@ static final class EqualCoordinator<T> extends AtomicInteger implements Disposab
final Observer<? super Boolean> actual;
final BiPredicate<? super T, ? super T> comparer;
final ArrayCompositeResource<Disposable> resources;
final Observable<? extends T> first;
final Observable<? extends T> second;
final ObservableConsumable<? extends T> first;
final ObservableConsumable<? extends T> second;
final EqualSubscriber<T>[] subscribers;

volatile boolean cancelled;

public EqualCoordinator(Observer<? super Boolean> actual, int bufferSize,
Observable<? extends T> first, Observable<? extends T> second,
ObservableConsumable<? extends T> first, ObservableConsumable<? extends T> second,
BiPredicate<? super T, ? super T> comparer) {
this.actual = actual;
this.first = first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@

public final class NbpOnSubscribeUsing<T, D> implements ObservableConsumable<T> {
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends Observable<? extends T>> sourceSupplier;
final Function<? super D, ? extends ObservableConsumable<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;

public NbpOnSubscribeUsing(Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends Observable<? extends T>> sourceSupplier,
Function<? super D, ? extends ObservableConsumable<? extends T>> sourceSupplier,
Consumer<? super D> disposer,
boolean eager) {
this.resourceSupplier = resourceSupplier;
Expand All @@ -50,7 +50,7 @@ public void subscribe(Observer<? super T> s) {
return;
}

Observable<? extends T> source;
ObservableConsumable<? extends T> source;
try {
source = sourceSupplier.apply(resource);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@

public final class NbpOnSubscribeZip<T, R> implements ObservableConsumable<R> {

final Observable<? extends T>[] sources;
final Iterable<? extends Observable<? extends T>> sourcesIterable;
final ObservableConsumable<? extends T>[] sources;
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;

public NbpOnSubscribeZip(Observable<? extends T>[] sources,
Iterable<? extends Observable<? extends T>> sourcesIterable,
public NbpOnSubscribeZip(ObservableConsumable<? extends T>[] sources,
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
int bufferSize,
boolean delayError) {
Expand All @@ -46,11 +46,11 @@ public NbpOnSubscribeZip(Observable<? extends T>[] sources,
@Override
@SuppressWarnings("unchecked")
public void subscribe(Observer<? super R> s) {
Observable<? extends T>[] sources = this.sources;
ObservableConsumable<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
for (Observable<? extends T> p : sourcesIterable) {
for (ObservableConsumable<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Expand Down Expand Up @@ -93,7 +93,7 @@ public ZipCoordinator(Observer<? super R> actual,
this.delayError = delayError;
}

public void subscribe(Observable<? extends T>[] sources, int bufferSize) {
public void subscribe(ObservableConsumable<? extends T>[] sources, int bufferSize) {
ZipSubscriber<T, R>[] s = subscribers;
int len = s.length;
for (int i = 0; i < len; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.Observable;
import io.reactivex.Observable.NbpOperator;
import io.reactivex.ObservableConsumable;
import io.reactivex.Observer;
import io.reactivex.disposables.*;
import io.reactivex.functions.*;
Expand All @@ -31,11 +31,11 @@

public final class NbpOperatorBufferBoundary<T, U extends Collection<? super T>, Open, Close> implements NbpOperator<U, T> {
final Supplier<U> bufferSupplier;
final Observable<? extends Open> bufferOpen;
final Function<? super Open, ? extends Observable<? extends Close>> bufferClose;
final ObservableConsumable<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;

public NbpOperatorBufferBoundary(Observable<? extends Open> bufferOpen,
Function<? super Open, ? extends Observable<? extends Close>> bufferClose, Supplier<U> bufferSupplier) {
public NbpOperatorBufferBoundary(ObservableConsumable<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose, Supplier<U> bufferSupplier) {
this.bufferOpen = bufferOpen;
this.bufferClose = bufferClose;
this.bufferSupplier = bufferSupplier;
Expand All @@ -51,8 +51,8 @@ public Observer<? super T> apply(Observer<? super U> t) {

static final class BufferBoundarySubscriber<T, U extends Collection<? super T>, Open, Close>
extends NbpQueueDrainSubscriber<T, U, U> implements Disposable {
final Observable<? extends Open> bufferOpen;
final Function<? super Open, ? extends Observable<? extends Close>> bufferClose;
final ObservableConsumable<? extends Open> bufferOpen;
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
final Supplier<U> bufferSupplier;
final SetCompositeResource<Disposable> resources;

Expand All @@ -63,8 +63,8 @@ static final class BufferBoundarySubscriber<T, U extends Collection<? super T>,
final AtomicInteger windows = new AtomicInteger();

public BufferBoundarySubscriber(Observer<? super U> actual,
Observable<? extends Open> bufferOpen,
Function<? super Open, ? extends Observable<? extends Close>> bufferClose,
ObservableConsumable<? extends Open> bufferOpen,
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose,
Supplier<U> bufferSupplier) {
super(actual, new MpscLinkedQueue<U>());
this.bufferOpen = bufferOpen;
Expand Down Expand Up @@ -164,7 +164,7 @@ void open(Open window) {
return;
}

Observable<? extends Close> p;
ObservableConsumable<? extends Close> p;

try {
p = bufferClose.apply(window);
Expand Down
Loading