Skip to content

Commit 0c30b08

Browse files
committed
2.x: fixes, cleanup, coverage 8/31-1
1 parent a856572 commit 0c30b08

File tree

57 files changed

+2398
-855
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2398
-855
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5086,7 +5086,7 @@ public final T blockingSingle(T defaultItem) {
50865086
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
50875087
*/
50885088
public final Future<T> toFuture() {
5089-
return FlowableToFuture.toFuture(this);
5089+
return subscribeWith(new FutureSubscriber<T>());
50905090
}
50915091

50925092
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4538,7 +4538,7 @@ public final T blockingSingle(T defaultItem) {
45384538
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
45394539
*/
45404540
public final Future<T> toFuture() {
4541-
return ObservableToFuture.toFuture(this);
4541+
return subscribeWith(new FutureObserver<T>());
45424542
}
45434543

45444544
/**

src/main/java/io/reactivex/disposables/Disposables.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,20 @@ private Disposables() {
3737
* executed exactly once when the Disposable is disposed.
3838
* @param run the Runnable to wrap
3939
* @return the new Disposable instance
40+
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
4041
*/
42+
@Deprecated
4143
public static Disposable from(Runnable run) {
44+
return fromRunnable(run);
45+
}
46+
47+
/**
48+
* Construct a Disposable by wrapping a Runnable that is
49+
* executed exactly once when the Disposable is disposed.
50+
* @param run the Runnable to wrap
51+
* @return the new Disposable instance
52+
*/
53+
public static Disposable fromRunnable(Runnable run) {
4254
ObjectHelper.requireNonNull(run, "run is null");
4355
return new RunnableDisposable(run);
4456
}
@@ -48,8 +60,20 @@ public static Disposable from(Runnable run) {
4860
* executed exactly once when the Disposable is disposed.
4961
* @param run the Action to wrap
5062
* @return the new Disposable instance
63+
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
5164
*/
65+
@Deprecated
5266
public static Disposable from(Action run) {
67+
return fromAction(run);
68+
}
69+
70+
/**
71+
* Construct a Disposable by wrapping a Action that is
72+
* executed exactly once when the Disposable is disposed.
73+
* @param run the Action to wrap
74+
* @return the new Disposable instance
75+
*/
76+
public static Disposable fromAction(Action run) {
5377
ObjectHelper.requireNonNull(run, "run is null");
5478
return new ActionDisposable(run);
5579
}
@@ -59,10 +83,11 @@ public static Disposable from(Action run) {
5983
* cancelled exactly once when the Disposable is disposed.
6084
* @param future the Future to wrap
6185
* @return the new Disposable instance
86+
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
6287
*/
88+
@Deprecated
6389
public static Disposable from(Future<?> future) {
64-
ObjectHelper.requireNonNull(future, "future is null");
65-
return from(future, true);
90+
return fromFuture(future, true);
6691
}
6792

6893
/**
@@ -71,8 +96,32 @@ public static Disposable from(Future<?> future) {
7196
* @param future the Runnable to wrap
7297
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
7398
* @return the new Disposable instance
99+
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
74100
*/
101+
@Deprecated
75102
public static Disposable from(Future<?> future, boolean allowInterrupt) {
103+
return fromFuture(future, allowInterrupt);
104+
}
105+
106+
/**
107+
* Construct a Disposable by wrapping a Future that is
108+
* cancelled exactly once when the Disposable is disposed.
109+
* @param future the Future to wrap
110+
* @return the new Disposable instance
111+
*/
112+
public static Disposable fromFuture(Future<?> future) {
113+
ObjectHelper.requireNonNull(future, "future is null");
114+
return fromFuture(future, true);
115+
}
116+
117+
/**
118+
* Construct a Disposable by wrapping a Runnable that is
119+
* executed exactly once when the Disposable is disposed.
120+
* @param future the Runnable to wrap
121+
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
122+
* @return the new Disposable instance
123+
*/
124+
public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
76125
ObjectHelper.requireNonNull(future, "future is null");
77126
return new FutureDisposable(future, allowInterrupt);
78127
}
@@ -82,8 +131,20 @@ public static Disposable from(Future<?> future, boolean allowInterrupt) {
82131
* cancelled exactly once when the Disposable is disposed.
83132
* @param subscription the Runnable to wrap
84133
* @return the new Disposable instance
134+
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
85135
*/
136+
@Deprecated
86137
public static Disposable from(Subscription subscription) {
138+
return fromSubscription(subscription);
139+
}
140+
141+
/**
142+
* Construct a Disposable by wrapping a Subscription that is
143+
* cancelled exactly once when the Disposable is disposed.
144+
* @param subscription the Runnable to wrap
145+
* @return the new Disposable instance
146+
*/
147+
public static Disposable fromSubscription(Subscription subscription) {
87148
ObjectHelper.requireNonNull(subscription, "subscription is null");
88149
return new SubscriptionDisposable(subscription);
89150
}
@@ -93,7 +154,7 @@ public static Disposable from(Subscription subscription) {
93154
* @return a new, non-disposed Disposable instance
94155
*/
95156
public static Disposable empty() {
96-
return from(Functions.EMPTY_RUNNABLE);
157+
return fromRunnable(Functions.EMPTY_RUNNABLE);
97158
}
98159

99160
/**

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
/**
2424
* Utility methods to convert the Function3..Function9 instances to Function of Object array.
2525
*/
26-
public enum Functions {
27-
;
26+
public final class Functions {
27+
28+
/** Utility class. */
29+
private Functions() {
30+
throw new IllegalStateException("No instances!");
31+
}
2832

2933
@SuppressWarnings("unchecked")
3034
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> biFunction) {

src/main/java/io/reactivex/internal/functions/ObjectHelper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,13 @@
1818
* Utility methods containing the backport of Java 7's Objects utility class.
1919
* <p>Named as such to avoid clash with java.util.Objects.
2020
*/
21-
public enum ObjectHelper {
22-
;
21+
public final class ObjectHelper {
22+
23+
/** Utility class. */
24+
private ObjectHelper() {
25+
throw new IllegalStateException("No instances!");
26+
}
27+
2328
/**
2429
* Verifies if the object is not null and returns it or throws a NullPointerException
2530
* with the given message.

src/main/java/io/reactivex/internal/operators/completable/CompletableFromPublisher.java

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
import org.reactivestreams.*;
1717

1818
import io.reactivex.*;
19-
import io.reactivex.disposables.Disposables;
19+
import io.reactivex.disposables.*;
20+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2021

2122
public final class CompletableFromPublisher<T> extends Completable {
2223

@@ -28,30 +29,56 @@ public CompletableFromPublisher(Publisher<T> flowable) {
2829

2930
@Override
3031
protected void subscribeActual(final CompletableObserver cs) {
31-
flowable.subscribe(new Subscriber<T>() {
32+
flowable.subscribe(new FromPublisherSubscriber<T>(cs));
33+
}
34+
35+
static final class FromPublisherSubscriber<T> implements Subscriber<T>, Disposable {
3236

33-
@Override
34-
public void onComplete() {
35-
cs.onComplete();
37+
final CompletableObserver cs;
38+
39+
Subscription s;
40+
41+
public FromPublisherSubscriber(CompletableObserver actual) {
42+
this.cs = actual;
43+
}
44+
45+
@Override
46+
public void onSubscribe(Subscription s) {
47+
if (SubscriptionHelper.validate(this.s, s)) {
48+
this.s = s;
49+
50+
cs.onSubscribe(this);
51+
52+
s.request(Long.MAX_VALUE);
3653
}
54+
}
3755

38-
@Override
39-
public void onError(Throwable t) {
40-
cs.onError(t);
41-
}
4256

43-
@Override
44-
public void onNext(T t) {
45-
// ignored
46-
}
57+
@Override
58+
public void onNext(T t) {
59+
// ignored
60+
}
4761

48-
@Override
49-
public void onSubscribe(Subscription s) {
50-
cs.onSubscribe(Disposables.from(s));
51-
s.request(Long.MAX_VALUE);
52-
}
53-
54-
});
62+
@Override
63+
public void onError(Throwable t) {
64+
cs.onError(t);
65+
}
66+
67+
@Override
68+
public void onComplete() {
69+
cs.onComplete();
70+
}
71+
72+
@Override
73+
public void dispose() {
74+
s.cancel();
75+
s = SubscriptionHelper.CANCELLED;
76+
}
77+
78+
@Override
79+
public boolean isDisposed() {
80+
return s == SubscriptionHelper.CANCELLED;
81+
}
5582
}
5683

5784
}

0 commit comments

Comments
 (0)