Skip to content

Commit f8c33a9

Browse files
JakeWhartonakarnokd
authored andcommitted
Move error consumer helper to internal API. (#4386)
1 parent ae8e4d4 commit f8c33a9

File tree

5 files changed

+19
-27
lines changed

5 files changed

+19
-27
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5410,7 +5410,7 @@ public final void blockingSubscribe() {
54105410
* @since 2.0
54115411
*/
54125412
public final void blockingSubscribe(Consumer<? super T> onNext) {
5413-
FlowableBlockingSubscribe.subscribe(this, onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION);
5413+
FlowableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
54145414
}
54155415

54165416
/**
@@ -8496,7 +8496,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
84968496
@BackpressureSupport(BackpressureKind.NONE)
84978497
@SchedulerSupport(SchedulerSupport.NONE)
84988498
public final Disposable forEachWhile(Predicate<? super T> onNext) {
8499-
return forEachWhile(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION);
8499+
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
85008500
}
85018501

85028502
/**
@@ -11837,7 +11837,7 @@ public final Flowable<T> startWithArray(T... values) {
1183711837
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1183811838
@SchedulerSupport(SchedulerSupport.NONE)
1183911839
public final Disposable subscribe() {
11840-
return subscribe(Functions.emptyConsumer(), RxJavaPlugins.errorConsumer(),
11840+
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER,
1184111841
Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax());
1184211842
}
1184311843

@@ -11864,7 +11864,7 @@ public final Disposable subscribe() {
1186411864
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1186511865
@SchedulerSupport(SchedulerSupport.NONE)
1186611866
public final Disposable subscribe(Consumer<? super T> onNext) {
11867-
return subscribe(onNext, RxJavaPlugins.errorConsumer(),
11867+
return subscribe(onNext, Functions.ERROR_CONSUMER,
1186811868
Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax());
1186911869
}
1187011870

src/main/java/io/reactivex/Observable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4870,7 +4870,7 @@ public final void blockingSubscribe() {
48704870
* @since 2.0
48714871
*/
48724872
public final void blockingSubscribe(Consumer<? super T> onNext) {
4873-
ObservableBlockingSubscribe.subscribe(this, onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION);
4873+
ObservableBlockingSubscribe.subscribe(this, onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
48744874
}
48754875

48764876
/**
@@ -7478,7 +7478,7 @@ public final Disposable forEach(Consumer<? super T> onNext) {
74787478
*/
74797479
@SchedulerSupport(SchedulerSupport.NONE)
74807480
public final Disposable forEachWhile(Predicate<? super T> onNext) {
7481-
return forEachWhile(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION);
7481+
return forEachWhile(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION);
74827482
}
74837483

74847484
/**
@@ -10043,7 +10043,7 @@ public final Observable<T> startWithArray(T... values) {
1004310043
*/
1004410044
@SchedulerSupport(SchedulerSupport.NONE)
1004510045
public final Disposable subscribe() {
10046-
return subscribe(Functions.emptyConsumer(), RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer());
10046+
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
1004710047
}
1004810048

1004910049
/**
@@ -10065,7 +10065,7 @@ public final Disposable subscribe() {
1006510065
*/
1006610066
@SchedulerSupport(SchedulerSupport.NONE)
1006710067
public final Disposable subscribe(Consumer<? super T> onNext) {
10068-
return subscribe(onNext, RxJavaPlugins.errorConsumer(), Functions.EMPTY_ACTION, Functions.emptyConsumer());
10068+
return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
1006910069
}
1007010070

1007110071
/**

src/main/java/io/reactivex/Single.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2511,7 +2511,7 @@ public final void safeSubscribe(Subscriber<? super T> s) {
25112511
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
25122512
*/
25132513
public final Disposable subscribe() {
2514-
return subscribe(Functions.emptyConsumer(), RxJavaPlugins.errorConsumer());
2514+
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER);
25152515
}
25162516

25172517
/**
@@ -2553,7 +2553,7 @@ public final Disposable subscribe(final BiConsumer<? super T, ? super Throwable>
25532553
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
25542554
*/
25552555
public final Disposable subscribe(Consumer<? super T> onSuccess) {
2556-
return subscribe(onSuccess, RxJavaPlugins.errorConsumer());
2556+
return subscribe(onSuccess, Functions.ERROR_CONSUMER);
25572557
}
25582558

25592559
/**

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.functions.*;
20+
import io.reactivex.plugins.RxJavaPlugins;
2021
import io.reactivex.schedulers.Timed;
2122

2223
/**
@@ -183,7 +184,14 @@ public void accept(Object v) { }
183184
public static <T> Consumer<T> emptyConsumer() {
184185
return (Consumer<T>)EMPTY_CONSUMER;
185186
}
186-
187+
188+
public static final Consumer<Throwable> ERROR_CONSUMER = new Consumer<Throwable>() {
189+
@Override
190+
public void accept(Throwable error) {
191+
RxJavaPlugins.onError(error);
192+
}
193+
};
194+
187195
public static final LongConsumer EMPTY_LONGCONSUMER = new LongConsumer() {
188196
@Override
189197
public void accept(long v) { }

src/main/java/io/reactivex/plugins/RxJavaPlugins.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -747,22 +747,6 @@ public static Completable onAssembly(Completable source) {
747747
return source;
748748
}
749749

750-
/** Singleton consumer that calls RxJavaPlugins.onError. */
751-
static final Consumer<Throwable> CONSUME_BY_RXJAVA_PLUGIN = new Consumer<Throwable>() {
752-
@Override
753-
public void accept(Throwable e) {
754-
RxJavaPlugins.onError(e);
755-
}
756-
};
757-
758-
/**
759-
* Returns a consumer which relays the received Throwable to RxJavaPlugins.onError().
760-
* @return the consumer
761-
*/
762-
public static Consumer<Throwable> errorConsumer() {
763-
return CONSUME_BY_RXJAVA_PLUGIN;
764-
}
765-
766750
/**
767751
* Wraps the call to the function in try-catch and propagates thrown
768752
* checked exceptions as runtimeexception.

0 commit comments

Comments
 (0)