Skip to content

Commit f97c50d

Browse files
authored
2.x: cleanup, behavior clarifications, fixes, coverage 8/28-1 (#4435)
1 parent 25e78c5 commit f97c50d

File tree

56 files changed

+3237
-234
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+3237
-234
lines changed

src/main/java/io/reactivex/disposables/RunnableDisposable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,9 @@ final class RunnableDisposable extends ReferenceDisposable<Runnable> {
2727
protected void onDisposed(Runnable value) {
2828
value.run();
2929
}
30+
31+
@Override
32+
public String toString() {
33+
return "RunnableDisposable(disposed=" + isDisposed() + ", " + get() + ")";
34+
}
3035
}

src/main/java/io/reactivex/disposables/SerialDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

18-
import io.reactivex.internal.disposables.DisposableHelper;
18+
import io.reactivex.internal.disposables.*;
1919

2020
/**
2121
* A Disposable container that allows atomically updating/replacing the contained

src/main/java/io/reactivex/internal/disposables/DisposableHelper.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
/**
2323
* Utility methods for working with Disposables atomically.
2424
*/
25-
public enum DisposableHelper {
26-
;
27-
28-
/**
29-
* Marker instance compared by identity for indicating a previously referenced
30-
* {@link Disposable} was disposed. DO NOT USE this instance as an arbitrary, empty disposable!
25+
public enum DisposableHelper implements Disposable {
26+
/**
27+
* The singleton instance representing a terminal, disposed state;
28+
* Don't leak it!
3129
*/
32-
public static final Disposable DISPOSED = Disposed.INSTANCE;
30+
DISPOSED
31+
;
3332

3433
public static boolean isDisposed(Disposable d) {
3534
return d == DISPOSED;
@@ -82,9 +81,10 @@ public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
8281

8382
public static boolean dispose(AtomicReference<Disposable> field) {
8483
Disposable current = field.get();
85-
if (current != DISPOSED) {
86-
current = field.getAndSet(DISPOSED);
87-
if (current != DISPOSED) {
84+
Disposable d = DISPOSED;
85+
if (current != d) {
86+
current = field.getAndSet(d);
87+
if (current != d) {
8888
if (current != null) {
8989
current.dispose();
9090
}
@@ -121,18 +121,13 @@ public static void reportDisposableSet() {
121121
RxJavaPlugins.onError(new IllegalStateException("Disposable already set!"));
122122
}
123123

124-
enum Disposed implements Disposable {
125-
INSTANCE;
126-
127-
@Override
128-
public void dispose() {
129-
// deliberately no-op
130-
}
131-
132-
@Override
133-
public boolean isDisposed() {
134-
return true;
135-
}
124+
@Override
125+
public void dispose() {
126+
// deliberately no-op
127+
}
128+
129+
@Override
130+
public boolean isDisposed() {
131+
return true;
136132
}
137-
138133
}

src/main/java/io/reactivex/internal/disposables/ListCompositeDisposable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public ListCompositeDisposable(Disposable... resources) {
4242

4343
public ListCompositeDisposable(Iterable<? extends Disposable> resources) {
4444
ObjectHelper.requireNonNull(resources, "resources is null");
45+
this.resources = new LinkedList<Disposable>();
4546
for (Disposable d : resources) {
4647
ObjectHelper.requireNonNull(d, "Disposable item is null");
4748
this.resources.add(d);

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,11 @@ public R apply(Object[] a) throws Exception {
148148
public Object apply(Object v) {
149149
return v;
150150
}
151+
152+
@Override
153+
public String toString() {
154+
return "IdentityFunction";
155+
}
151156
};
152157

153158
/**
@@ -163,16 +168,31 @@ public static <T> Function<T, T> identity() {
163168
public static final Runnable EMPTY_RUNNABLE = new Runnable() {
164169
@Override
165170
public void run() { }
171+
172+
@Override
173+
public String toString() {
174+
return "EmptyRunnable";
175+
}
166176
};
167177

168178
public static final Action EMPTY_ACTION = new Action() {
169179
@Override
170180
public void run() { }
181+
182+
@Override
183+
public String toString() {
184+
return "EmptyAction";
185+
}
171186
};
172187

173188
static final Consumer<Object> EMPTY_CONSUMER = new Consumer<Object>() {
174189
@Override
175190
public void accept(Object v) { }
191+
192+
@Override
193+
public String toString() {
194+
return "EmptyConsumer";
195+
}
176196
};
177197

178198
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferBoundarySupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.exceptions.Exceptions;
24-
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.disposables.*;
2525
import io.reactivex.internal.queue.MpscLinkedQueue;
2626
import io.reactivex.internal.subscribers.flowable.QueueDrainSubscriber;
2727
import io.reactivex.internal.subscriptions.*;

src/main/java/io/reactivex/internal/operators/flowable/FlowableBufferTimed.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import org.reactivestreams.*;
2121

22-
import io.reactivex.*;
22+
import io.reactivex.Scheduler;
2323
import io.reactivex.Scheduler.Worker;
2424
import io.reactivex.disposables.Disposable;
2525
import io.reactivex.exceptions.Exceptions;
26-
import io.reactivex.internal.disposables.DisposableHelper;
26+
import io.reactivex.internal.disposables.*;
2727
import io.reactivex.internal.queue.MpscLinkedQueue;
2828
import io.reactivex.internal.subscribers.flowable.QueueDrainSubscriber;
2929
import io.reactivex.internal.subscriptions.*;

src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
import org.reactivestreams.*;
2020

21-
import io.reactivex.*;
21+
import io.reactivex.Scheduler;
2222
import io.reactivex.Scheduler.Worker;
2323
import io.reactivex.disposables.Disposable;
24-
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.disposables.*;
2525
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2626
import io.reactivex.internal.util.BackpressureHelper;
2727
import io.reactivex.plugins.RxJavaPlugins;

src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.reactivex.*;
2222
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.exceptions.MissingBackpressureException;
24-
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.disposables.*;
2525
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2626
import io.reactivex.internal.util.BackpressureHelper;
2727

src/main/java/io/reactivex/internal/operators/flowable/FlowableIntervalRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import io.reactivex.*;
2222
import io.reactivex.disposables.Disposable;
23-
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.disposables.*;
2424
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2525
import io.reactivex.internal.util.BackpressureHelper;
2626

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.reactivex.Scheduler.Worker;
2525
import io.reactivex.disposables.Disposable;
2626
import io.reactivex.exceptions.Exceptions;
27-
import io.reactivex.internal.disposables.DisposableHelper;
27+
import io.reactivex.internal.disposables.*;
2828
import io.reactivex.internal.fuseable.SimpleQueue;
2929
import io.reactivex.internal.queue.MpscLinkedQueue;
3030
import io.reactivex.internal.subscribers.flowable.QueueDrainSubscriber;

src/main/java/io/reactivex/internal/operators/observable/ObservableDebounce.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.Function;
22-
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.observers.*;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

src/main/java/io/reactivex/internal/operators/observable/ObservableDebounceTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.*;
2020
import io.reactivex.Scheduler.Worker;
2121
import io.reactivex.disposables.Disposable;
22-
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.observers.SerializedObserver;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.exceptions.*;
2424
import io.reactivex.functions.Function;
25-
import io.reactivex.internal.disposables.DisposableHelper;
25+
import io.reactivex.internal.disposables.*;
2626
import io.reactivex.internal.fuseable.*;
2727
import io.reactivex.internal.queue.*;
2828

src/main/java/io/reactivex/internal/operators/observable/ObservableInterval.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import io.reactivex.*;
2020
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.disposables.*;
2222

2323
public final class ObservableInterval extends Observable<Long> {
2424
final Scheduler scheduler;

src/main/java/io/reactivex/internal/operators/observable/ObservableIntervalRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import io.reactivex.*;
2020
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.disposables.*;
2222

2323
public final class ObservableIntervalRange extends Observable<Long> {
2424
final Scheduler scheduler;

src/main/java/io/reactivex/internal/operators/observable/ObservableSampleWithObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.disposables.*;
2121
import io.reactivex.observers.SerializedObserver;
2222

2323
public final class ObservableSampleWithObservable<T> extends AbstractObservableWithUpstream<T, T> {

src/main/java/io/reactivex/internal/operators/observable/ObservableThrottleFirstTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.*;
2020
import io.reactivex.Scheduler.Worker;
2121
import io.reactivex.disposables.Disposable;
22-
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.observers.SerializedObserver;
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

src/main/java/io/reactivex/internal/operators/observable/ObserverResourceWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.Observer;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.disposables.*;
2121

2222
public final class ObserverResourceWrapper<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
2323
/** */

src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import io.reactivex.*;
2222
import io.reactivex.disposables.Disposable;
23-
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.disposables.*;
2424
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2525
import io.reactivex.plugins.RxJavaPlugins;
2626

src/main/java/io/reactivex/internal/subscribers/completable/CallbackCompletableObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.functions.*;
22-
import io.reactivex.internal.disposables.DisposableHelper;
22+
import io.reactivex.internal.disposables.*;
2323
import io.reactivex.plugins.RxJavaPlugins;
2424

2525
public final class CallbackCompletableObserver

src/main/java/io/reactivex/internal/subscribers/completable/EmptyCompletableObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import io.reactivex.CompletableObserver;
1919
import io.reactivex.disposables.Disposable;
20-
import io.reactivex.internal.disposables.DisposableHelper;
20+
import io.reactivex.internal.disposables.*;
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

2323
public final class EmptyCompletableObserver

src/main/java/io/reactivex/internal/subscribers/flowable/ForEachWhileSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.reactivestreams.*;
1919

2020
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.exceptions.*;
2222
import io.reactivex.functions.*;
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.plugins.RxJavaPlugins;
@@ -85,7 +85,7 @@ public void onError(Throwable t) {
8585
onError.accept(t);
8686
} catch (Throwable ex) {
8787
Exceptions.throwIfFatal(ex);
88-
RxJavaPlugins.onError(ex);
88+
RxJavaPlugins.onError(new CompositeException(t, ex));
8989
}
9090
}
9191

src/main/java/io/reactivex/internal/subscribers/observable/BlockingObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
import java.util.Queue;
1717
import java.util.concurrent.atomic.AtomicReference;
1818

19-
import io.reactivex.*;
19+
import io.reactivex.Observer;
2020
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.internal.disposables.DisposableHelper;
21+
import io.reactivex.internal.disposables.*;
2222
import io.reactivex.internal.util.NotificationLite;
2323

2424
public final class BlockingObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

0 commit comments

Comments
 (0)