Skip to content

Commit 596f77f

Browse files
authored
2.x: Update Observable's ops to work with ObservableConsumable (#4041)
1 parent 3382fe7 commit 596f77f

34 files changed

+445
-430
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 247 additions & 235 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/observable/NbpObservableScalarSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ public T value() {
3838
return value;
3939
}
4040

41-
public <U> ObservableConsumable<U> scalarFlatMap(final Function<? super T, ? extends Observable<? extends U>> mapper) {
41+
public <U> ObservableConsumable<U> scalarFlatMap(final Function<? super T, ? extends ObservableConsumable<? extends U>> mapper) {
4242
return new ObservableConsumable<U>() {
4343
@Override
4444
public void subscribe(Observer<? super U> s) {
45-
Observable<? extends U> other;
45+
ObservableConsumable<? extends U> other;
4646
try {
4747
other = mapper.apply(value);
4848
} catch (Throwable e) {

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeAmb.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,22 @@
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

2424
public final class NbpOnSubscribeAmb<T> implements ObservableConsumable<T> {
25-
final Observable<? extends T>[] sources;
26-
final Iterable<? extends Observable<? extends T>> sourcesIterable;
25+
final ObservableConsumable<? extends T>[] sources;
26+
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
2727

28-
public NbpOnSubscribeAmb(Observable<? extends T>[] sources, Iterable<? extends Observable<? extends T>> sourcesIterable) {
28+
public NbpOnSubscribeAmb(ObservableConsumable<? extends T>[] sources, Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable) {
2929
this.sources = sources;
3030
this.sourcesIterable = sourcesIterable;
3131
}
3232

3333
@Override
3434
@SuppressWarnings("unchecked")
3535
public void subscribe(Observer<? super T> s) {
36-
Observable<? extends T>[] sources = this.sources;
36+
ObservableConsumable<? extends T>[] sources = this.sources;
3737
int count = 0;
3838
if (sources == null) {
3939
sources = new Observable[8];
40-
for (Observable<? extends T> p : sourcesIterable) {
40+
for (ObservableConsumable<? extends T> p : sourcesIterable) {
4141
if (count == sources.length) {
4242
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
4343
System.arraycopy(sources, 0, b, 0, count);
@@ -74,7 +74,7 @@ public AmbCoordinator(Observer<? super T> actual, int count) {
7474
this.subscribers = new AmbInnerSubscriber[count];
7575
}
7676

77-
public void subscribe(Observable<? extends T>[] sources) {
77+
public void subscribe(ObservableConsumable<? extends T>[] sources) {
7878
AmbInnerSubscriber<T>[] as = subscribers;
7979
int len = as.length;
8080
for (int i = 0; i < len; i++) {

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeCombineLatest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
import io.reactivex.plugins.RxJavaPlugins;
2929

3030
public final class NbpOnSubscribeCombineLatest<T, R> implements ObservableConsumable<R> {
31-
final Observable<? extends T>[] sources;
32-
final Iterable<? extends Observable<? extends T>> sourcesIterable;
31+
final ObservableConsumable<? extends T>[] sources;
32+
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
3333
final Function<? super Object[], ? extends R> combiner;
3434
final int bufferSize;
3535
final boolean delayError;
3636

37-
public NbpOnSubscribeCombineLatest(Observable<? extends T>[] sources,
38-
Iterable<? extends Observable<? extends T>> sourcesIterable,
37+
public NbpOnSubscribeCombineLatest(ObservableConsumable<? extends T>[] sources,
38+
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
3939
Function<? super Object[], ? extends R> combiner, int bufferSize,
4040
boolean delayError) {
4141
this.sources = sources;
@@ -49,11 +49,11 @@ public NbpOnSubscribeCombineLatest(Observable<? extends T>[] sources,
4949
@Override
5050
@SuppressWarnings("unchecked")
5151
public void subscribe(Observer<? super R> s) {
52-
Observable<? extends T>[] sources = this.sources;
52+
ObservableConsumable<? extends T>[] sources = this.sources;
5353
int count = 0;
5454
if (sources == null) {
5555
sources = new Observable[8];
56-
for (Observable<? extends T> p : sourcesIterable) {
56+
for (ObservableConsumable<? extends T> p : sourcesIterable) {
5757
if (count == sources.length) {
5858
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
5959
System.arraycopy(sources, 0, b, 0, count);
@@ -109,7 +109,7 @@ public LatestCoordinator(Observer<? super R> actual,
109109
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
110110
}
111111

112-
public void subscribe(Observable<? extends T>[] sources) {
112+
public void subscribe(ObservableConsumable<? extends T>[] sources) {
113113
Observer<T>[] as = subscribers;
114114
int len = as.length;
115115
for (int i = 0; i < len; i++) {

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeDefer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
import io.reactivex.internal.disposables.EmptyDisposable;
1919

2020
public final class NbpOnSubscribeDefer<T> implements ObservableConsumable<T> {
21-
final Supplier<? extends Observable<? extends T>> supplier;
22-
public NbpOnSubscribeDefer(Supplier<? extends Observable<? extends T>> supplier) {
21+
final Supplier<? extends ObservableConsumable<? extends T>> supplier;
22+
public NbpOnSubscribeDefer(Supplier<? extends ObservableConsumable<? extends T>> supplier) {
2323
this.supplier = supplier;
2424
}
2525
@Override
2626
public void subscribe(Observer<? super T> s) {
27-
Observable<? extends T> pub;
27+
ObservableConsumable<? extends T> pub;
2828
try {
2929
pub = supplier.get();
3030
} catch (Throwable t) {

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeDelaySubscriptionOther.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
* @param <U> the other value type, ignored
2525
*/
2626
public final class NbpOnSubscribeDelaySubscriptionOther<T, U> implements ObservableConsumable<T> {
27-
final Observable<? extends T> main;
28-
final Observable<U> other;
27+
final ObservableConsumable<? extends T> main;
28+
final ObservableConsumable<U> other;
2929

30-
public NbpOnSubscribeDelaySubscriptionOther(Observable<? extends T> main, Observable<U> other) {
30+
public NbpOnSubscribeDelaySubscriptionOther(ObservableConsumable<? extends T> main, ObservableConsumable<U> other) {
3131
this.main = main;
3232
this.other = other;
3333
}
@@ -66,7 +66,7 @@ public void onComplete() {
6666
}
6767
done = true;
6868

69-
main.unsafeSubscribe(new Observer<T>() {
69+
main.subscribe(new Observer<T>() {
7070
@Override
7171
public void onSubscribe(Disposable d) {
7272
serial.set(d);
@@ -90,6 +90,6 @@ public void onComplete() {
9090
}
9191
};
9292

93-
other.unsafeSubscribe(otherSubscriber);
93+
other.subscribe(otherSubscriber);
9494
}
9595
}

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeLift.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import io.reactivex.*;
17-
import io.reactivex.Observable.NbpOperator;
1817
import io.reactivex.plugins.RxJavaPlugins;
1918

2019
/**
@@ -26,13 +25,13 @@
2625
* @param <T> the upstream value type
2726
* @param <R> the downstream parameter type
2827
*/
29-
public final class NbpOnSubscribeLift<R, T> implements ObservableConsumable<R> {
28+
public final class NbpOnSubscribeLift<R, T> extends Observable<R> {
3029
/** The actual operator. */
3130
final NbpOperator<? extends R, ? super T> operator;
3231
/** The source publisher. */
33-
final Observable<? extends T> source;
32+
final ObservableConsumable<? extends T> source;
3433

35-
public NbpOnSubscribeLift(Observable<? extends T> source, NbpOperator<? extends R, ? super T> operator) {
34+
public NbpOnSubscribeLift(ObservableConsumable<? extends T> source, NbpOperator<? extends R, ? super T> operator) {
3635
this.source = source;
3736
this.operator = operator;
3837
}
@@ -49,12 +48,12 @@ public NbpOnSubscribeLift(Observable<? extends T> source, NbpOperator<? extends
4948
* Returns the source of this lift publisher.
5049
* @return the source of this lift publisher
5150
*/
52-
public Observable<? extends T> source() {
51+
public ObservableConsumable<? extends T> source() {
5352
return source;
5453
}
5554

5655
@Override
57-
public void subscribe(Observer<? super R> s) {
56+
public void subscribeActual(Observer<? super R> s) {
5857
try {
5958
if (s == null) {
6059
throw new NullPointerException("Operator " + operator + " received a null Subscriber");

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeRedo.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import io.reactivex.subjects.BehaviorSubject;
2323

2424
public final class NbpOnSubscribeRedo<T> implements ObservableConsumable<T> {
25-
final Observable<? extends T> source;
26-
final Function<? super Observable<Try<Optional<Object>>>, ? extends Observable<?>> manager;
25+
final ObservableConsumable<? extends T> source;
26+
final Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableConsumable<?>> manager;
2727

28-
public NbpOnSubscribeRedo(Observable<? extends T> source,
29-
Function<? super Observable<Try<Optional<Object>>>, ? extends Observable<?>> manager) {
28+
public NbpOnSubscribeRedo(ObservableConsumable<? extends T> source,
29+
Function<? super Observable<Try<Optional<Object>>>, ? extends ObservableConsumable<?>> manager) {
3030
this.source = source;
3131
this.manager = manager;
3232
}
@@ -41,7 +41,7 @@ public void subscribe(Observer<? super T> s) {
4141

4242
s.onSubscribe(parent.arbiter);
4343

44-
Observable<?> action = manager.apply(subject);
44+
ObservableConsumable<?> action = manager.apply(subject);
4545

4646
action.subscribe(new NbpToNotificationSubscriber<Object>(new Consumer<Try<Optional<Object>>>() {
4747
@Override
@@ -59,12 +59,12 @@ static final class RedoSubscriber<T> extends AtomicBoolean implements Observer<T
5959
private static final long serialVersionUID = -1151903143112844287L;
6060
final Observer<? super T> actual;
6161
final BehaviorSubject<Try<Optional<Object>>> subject;
62-
final Observable<? extends T> source;
62+
final ObservableConsumable<? extends T> source;
6363
final MultipleAssignmentDisposable arbiter;
6464

6565
final AtomicInteger wip = new AtomicInteger();
6666

67-
public RedoSubscriber(Observer<? super T> actual, BehaviorSubject<Try<Optional<Object>>> subject, Observable<? extends T> source) {
67+
public RedoSubscriber(Observer<? super T> actual, BehaviorSubject<Try<Optional<Object>>> subject, ObservableConsumable<? extends T> source) {
6868
this.actual = actual;
6969
this.subject = subject;
7070
this.source = source;

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeSequenceEqual.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2424

2525
public final class NbpOnSubscribeSequenceEqual<T> implements ObservableConsumable<Boolean> {
26-
final Observable<? extends T> first;
27-
final Observable<? extends T> second;
26+
final ObservableConsumable<? extends T> first;
27+
final ObservableConsumable<? extends T> second;
2828
final BiPredicate<? super T, ? super T> comparer;
2929
final int bufferSize;
3030

31-
public NbpOnSubscribeSequenceEqual(Observable<? extends T> first, Observable<? extends T> second,
31+
public NbpOnSubscribeSequenceEqual(ObservableConsumable<? extends T> first, ObservableConsumable<? extends T> second,
3232
BiPredicate<? super T, ? super T> comparer, int bufferSize) {
3333
this.first = first;
3434
this.second = second;
@@ -48,14 +48,14 @@ static final class EqualCoordinator<T> extends AtomicInteger implements Disposab
4848
final Observer<? super Boolean> actual;
4949
final BiPredicate<? super T, ? super T> comparer;
5050
final ArrayCompositeResource<Disposable> resources;
51-
final Observable<? extends T> first;
52-
final Observable<? extends T> second;
51+
final ObservableConsumable<? extends T> first;
52+
final ObservableConsumable<? extends T> second;
5353
final EqualSubscriber<T>[] subscribers;
5454

5555
volatile boolean cancelled;
5656

5757
public EqualCoordinator(Observer<? super Boolean> actual, int bufferSize,
58-
Observable<? extends T> first, Observable<? extends T> second,
58+
ObservableConsumable<? extends T> first, ObservableConsumable<? extends T> second,
5959
BiPredicate<? super T, ? super T> comparer) {
6060
this.actual = actual;
6161
this.first = first;

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeUsing.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525

2626
public final class NbpOnSubscribeUsing<T, D> implements ObservableConsumable<T> {
2727
final Supplier<? extends D> resourceSupplier;
28-
final Function<? super D, ? extends Observable<? extends T>> sourceSupplier;
28+
final Function<? super D, ? extends ObservableConsumable<? extends T>> sourceSupplier;
2929
final Consumer<? super D> disposer;
3030
final boolean eager;
3131

3232
public NbpOnSubscribeUsing(Supplier<? extends D> resourceSupplier,
33-
Function<? super D, ? extends Observable<? extends T>> sourceSupplier,
33+
Function<? super D, ? extends ObservableConsumable<? extends T>> sourceSupplier,
3434
Consumer<? super D> disposer,
3535
boolean eager) {
3636
this.resourceSupplier = resourceSupplier;
@@ -50,7 +50,7 @@ public void subscribe(Observer<? super T> s) {
5050
return;
5151
}
5252

53-
Observable<? extends T> source;
53+
ObservableConsumable<? extends T> source;
5454
try {
5555
source = sourceSupplier.apply(resource);
5656
} catch (Throwable e) {

src/main/java/io/reactivex/internal/operators/observable/NbpOnSubscribeZip.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525

2626
public final class NbpOnSubscribeZip<T, R> implements ObservableConsumable<R> {
2727

28-
final Observable<? extends T>[] sources;
29-
final Iterable<? extends Observable<? extends T>> sourcesIterable;
28+
final ObservableConsumable<? extends T>[] sources;
29+
final Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable;
3030
final Function<? super Object[], ? extends R> zipper;
3131
final int bufferSize;
3232
final boolean delayError;
3333

34-
public NbpOnSubscribeZip(Observable<? extends T>[] sources,
35-
Iterable<? extends Observable<? extends T>> sourcesIterable,
34+
public NbpOnSubscribeZip(ObservableConsumable<? extends T>[] sources,
35+
Iterable<? extends ObservableConsumable<? extends T>> sourcesIterable,
3636
Function<? super Object[], ? extends R> zipper,
3737
int bufferSize,
3838
boolean delayError) {
@@ -46,11 +46,11 @@ public NbpOnSubscribeZip(Observable<? extends T>[] sources,
4646
@Override
4747
@SuppressWarnings("unchecked")
4848
public void subscribe(Observer<? super R> s) {
49-
Observable<? extends T>[] sources = this.sources;
49+
ObservableConsumable<? extends T>[] sources = this.sources;
5050
int count = 0;
5151
if (sources == null) {
5252
sources = new Observable[8];
53-
for (Observable<? extends T> p : sourcesIterable) {
53+
for (ObservableConsumable<? extends T> p : sourcesIterable) {
5454
if (count == sources.length) {
5555
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
5656
System.arraycopy(sources, 0, b, 0, count);
@@ -93,7 +93,7 @@ public ZipCoordinator(Observer<? super R> actual,
9393
this.delayError = delayError;
9494
}
9595

96-
public void subscribe(Observable<? extends T>[] sources, int bufferSize) {
96+
public void subscribe(ObservableConsumable<? extends T>[] sources, int bufferSize) {
9797
ZipSubscriber<T, R>[] s = subscribers;
9898
int len = s.length;
9999
for (int i = 0; i < len; i++) {

src/main/java/io/reactivex/internal/operators/observable/NbpOperatorBufferBoundary.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import java.util.*;
1717
import java.util.concurrent.atomic.AtomicInteger;
1818

19-
import io.reactivex.Observable;
2019
import io.reactivex.Observable.NbpOperator;
20+
import io.reactivex.ObservableConsumable;
2121
import io.reactivex.Observer;
2222
import io.reactivex.disposables.*;
2323
import io.reactivex.functions.*;
@@ -31,11 +31,11 @@
3131

3232
public final class NbpOperatorBufferBoundary<T, U extends Collection<? super T>, Open, Close> implements NbpOperator<U, T> {
3333
final Supplier<U> bufferSupplier;
34-
final Observable<? extends Open> bufferOpen;
35-
final Function<? super Open, ? extends Observable<? extends Close>> bufferClose;
34+
final ObservableConsumable<? extends Open> bufferOpen;
35+
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
3636

37-
public NbpOperatorBufferBoundary(Observable<? extends Open> bufferOpen,
38-
Function<? super Open, ? extends Observable<? extends Close>> bufferClose, Supplier<U> bufferSupplier) {
37+
public NbpOperatorBufferBoundary(ObservableConsumable<? extends Open> bufferOpen,
38+
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose, Supplier<U> bufferSupplier) {
3939
this.bufferOpen = bufferOpen;
4040
this.bufferClose = bufferClose;
4141
this.bufferSupplier = bufferSupplier;
@@ -51,8 +51,8 @@ public Observer<? super T> apply(Observer<? super U> t) {
5151

5252
static final class BufferBoundarySubscriber<T, U extends Collection<? super T>, Open, Close>
5353
extends NbpQueueDrainSubscriber<T, U, U> implements Disposable {
54-
final Observable<? extends Open> bufferOpen;
55-
final Function<? super Open, ? extends Observable<? extends Close>> bufferClose;
54+
final ObservableConsumable<? extends Open> bufferOpen;
55+
final Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose;
5656
final Supplier<U> bufferSupplier;
5757
final SetCompositeResource<Disposable> resources;
5858

@@ -63,8 +63,8 @@ static final class BufferBoundarySubscriber<T, U extends Collection<? super T>,
6363
final AtomicInteger windows = new AtomicInteger();
6464

6565
public BufferBoundarySubscriber(Observer<? super U> actual,
66-
Observable<? extends Open> bufferOpen,
67-
Function<? super Open, ? extends Observable<? extends Close>> bufferClose,
66+
ObservableConsumable<? extends Open> bufferOpen,
67+
Function<? super Open, ? extends ObservableConsumable<? extends Close>> bufferClose,
6868
Supplier<U> bufferSupplier) {
6969
super(actual, new MpscLinkedQueue<U>());
7070
this.bufferOpen = bufferOpen;
@@ -164,7 +164,7 @@ void open(Open window) {
164164
return;
165165
}
166166

167-
Observable<? extends Close> p;
167+
ObservableConsumable<? extends Close> p;
168168

169169
try {
170170
p = bufferClose.apply(window);

0 commit comments

Comments
 (0)