Skip to content

Commit 8bf04e9

Browse files
authored
2.x: add tryOnError to create/XEmitter API (#5344)
* 2.x: add tryOnError to create/XEmitter API * Fix indentation.
1 parent ea6c7de commit 8bf04e9

File tree

15 files changed

+349
-40
lines changed

15 files changed

+349
-40
lines changed

src/main/java/io/reactivex/CompletableEmitter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,19 @@ public interface CompletableEmitter {
5757
* @return true if the downstream disposed the sequence
5858
*/
5959
boolean isDisposed();
60+
61+
/**
62+
* Attempts to emit the specified {@code Throwable} error if the downstream
63+
* hasn't cancelled the sequence or is otherwise terminated, returning false
64+
* if the emission is not allowed to happen due to lifecycle restrictions.
65+
* <p>
66+
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
67+
* if the error could not be delivered.
68+
* @param t the throwable error to signal if possible
69+
* @return true if successful, false if the downstream is not able to accept further
70+
* events
71+
* @since 2.1.1 - experimental
72+
*/
73+
@Experimental
74+
boolean tryOnError(@NonNull Throwable t);
6075
}

src/main/java/io/reactivex/FlowableEmitter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,19 @@ public interface FlowableEmitter<T> extends Emitter<T> {
6565
*/
6666
@NonNull
6767
FlowableEmitter<T> serialize();
68+
69+
/**
70+
* Attempts to emit the specified {@code Throwable} error if the downstream
71+
* hasn't cancelled the sequence or is otherwise terminated, returning false
72+
* if the emission is not allowed to happen due to lifecycle restrictions.
73+
* <p>
74+
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
75+
* if the error could not be delivered.
76+
* @param t the throwable error to signal if possible
77+
* @return true if successful, false if the downstream is not able to accept further
78+
* events
79+
* @since 2.1.1 - experimental
80+
*/
81+
@Experimental
82+
boolean tryOnError(@NonNull Throwable t);
6883
}

src/main/java/io/reactivex/MaybeEmitter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,19 @@ public interface MaybeEmitter<T> {
6565
* @return true if the downstream cancelled the sequence
6666
*/
6767
boolean isDisposed();
68+
69+
/**
70+
* Attempts to emit the specified {@code Throwable} error if the downstream
71+
* hasn't cancelled the sequence or is otherwise terminated, returning false
72+
* if the emission is not allowed to happen due to lifecycle restrictions.
73+
* <p>
74+
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
75+
* if the error could not be delivered.
76+
* @param t the throwable error to signal if possible
77+
* @return true if successful, false if the downstream is not able to accept further
78+
* events
79+
* @since 2.1.1 - experimental
80+
*/
81+
@Experimental
82+
boolean tryOnError(@NonNull Throwable t);
6883
}

src/main/java/io/reactivex/ObservableEmitter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,19 @@ public interface ObservableEmitter<T> extends Emitter<T> {
5656
*/
5757
@NonNull
5858
ObservableEmitter<T> serialize();
59+
60+
/**
61+
* Attempts to emit the specified {@code Throwable} error if the downstream
62+
* hasn't cancelled the sequence or is otherwise terminated, returning false
63+
* if the emission is not allowed to happen due to lifecycle restrictions.
64+
* <p>
65+
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
66+
* if the error could not be delivered.
67+
* @param t the throwable error to signal if possible
68+
* @return true if successful, false if the downstream is not able to accept further
69+
* events
70+
* @since 2.1.1 - experimental
71+
*/
72+
@Experimental
73+
boolean tryOnError(@NonNull Throwable t);
5974
}

src/main/java/io/reactivex/SingleEmitter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,19 @@ public interface SingleEmitter<T> {
6060
* @return true if the downstream cancelled the sequence
6161
*/
6262
boolean isDisposed();
63+
64+
/**
65+
* Attempts to emit the specified {@code Throwable} error if the downstream
66+
* hasn't cancelled the sequence or is otherwise terminated, returning false
67+
* if the emission is not allowed to happen due to lifecycle restrictions.
68+
* <p>
69+
* Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
70+
* if the error could not be delivered.
71+
* @param t the throwable error to signal if possible
72+
* @return true if successful, false if the downstream is not able to accept further
73+
* events
74+
* @since 2.1.1 - experimental
75+
*/
76+
@Experimental
77+
boolean tryOnError(@NonNull Throwable t);
6378
}

src/main/java/io/reactivex/internal/operators/completable/CompletableCreate.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public void onComplete() {
7373

7474
@Override
7575
public void onError(Throwable t) {
76+
if (!tryOnError(t)) {
77+
RxJavaPlugins.onError(t);
78+
}
79+
}
80+
81+
@Override
82+
public boolean tryOnError(Throwable t) {
7683
if (t == null) {
7784
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
7885
}
@@ -86,10 +93,10 @@ public void onError(Throwable t) {
8693
d.dispose();
8794
}
8895
}
89-
return;
96+
return true;
9097
}
9198
}
92-
RxJavaPlugins.onError(t);
99+
return false;
93100
}
94101

95102
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -129,21 +129,27 @@ public void onNext(T t) {
129129

130130
@Override
131131
public void onError(Throwable t) {
132-
if (emitter.isCancelled() || done) {
133-
RxJavaPlugins.onError(t);
134-
return;
135-
}
136-
if (t == null) {
137-
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
138-
}
139-
if (error.addThrowable(t)) {
140-
done = true;
141-
drain();
142-
} else {
132+
if (!tryOnError(t)) {
143133
RxJavaPlugins.onError(t);
144134
}
145135
}
146136

137+
@Override
138+
public boolean tryOnError(Throwable t) {
139+
if (emitter.isCancelled() || done) {
140+
return false;
141+
}
142+
if (t == null) {
143+
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
144+
}
145+
if (error.addThrowable(t)) {
146+
done = true;
147+
drain();
148+
return true;
149+
}
150+
return false;
151+
}
152+
147153
@Override
148154
public void onComplete() {
149155
if (emitter.isCancelled() || done) {
@@ -245,6 +251,10 @@ abstract static class BaseEmitter<T>
245251

246252
@Override
247253
public void onComplete() {
254+
complete();
255+
}
256+
257+
protected void complete() {
248258
if (isCancelled()) {
249259
return;
250260
}
@@ -256,19 +266,30 @@ public void onComplete() {
256266
}
257267

258268
@Override
259-
public void onError(Throwable e) {
269+
public final void onError(Throwable e) {
270+
if (!tryOnError(e)) {
271+
RxJavaPlugins.onError(e);
272+
}
273+
}
274+
275+
@Override
276+
public boolean tryOnError(Throwable e) {
277+
return error(e);
278+
}
279+
280+
protected boolean error(Throwable e) {
260281
if (e == null) {
261282
e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
262283
}
263284
if (isCancelled()) {
264-
RxJavaPlugins.onError(e);
265-
return;
285+
return false;
266286
}
267287
try {
268288
actual.onError(e);
269289
} finally {
270290
serial.dispose();
271291
}
292+
return true;
272293
}
273294

274295
@Override
@@ -446,10 +467,9 @@ public void onNext(T t) {
446467
}
447468

448469
@Override
449-
public void onError(Throwable e) {
470+
public boolean tryOnError(Throwable e) {
450471
if (done || isCancelled()) {
451-
RxJavaPlugins.onError(e);
452-
return;
472+
return false;
453473
}
454474

455475
if (e == null) {
@@ -459,6 +479,7 @@ public void onError(Throwable e) {
459479
error = e;
460480
done = true;
461481
drain();
482+
return true;
462483
}
463484

464485
@Override
@@ -507,9 +528,9 @@ void drain() {
507528
if (d && empty) {
508529
Throwable ex = error;
509530
if (ex != null) {
510-
super.onError(ex);
531+
error(ex);
511532
} else {
512-
super.onComplete();
533+
complete();
513534
}
514535
return;
515536
}
@@ -536,9 +557,9 @@ void drain() {
536557
if (d && empty) {
537558
Throwable ex = error;
538559
if (ex != null) {
539-
super.onError(ex);
560+
error(ex);
540561
} else {
541-
super.onComplete();
562+
complete();
542563
}
543564
return;
544565
}
@@ -589,17 +610,17 @@ public void onNext(T t) {
589610
}
590611

591612
@Override
592-
public void onError(Throwable e) {
613+
public boolean tryOnError(Throwable e) {
593614
if (done || isCancelled()) {
594-
RxJavaPlugins.onError(e);
595-
return;
615+
return false;
596616
}
597617
if (e == null) {
598618
onError(new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."));
599619
}
600620
error = e;
601621
done = true;
602622
drain();
623+
return true;
603624
}
604625

605626
@Override
@@ -648,9 +669,9 @@ void drain() {
648669
if (d && empty) {
649670
Throwable ex = error;
650671
if (ex != null) {
651-
super.onError(ex);
672+
error(ex);
652673
} else {
653-
super.onComplete();
674+
complete();
654675
}
655676
return;
656677
}
@@ -677,9 +698,9 @@ void drain() {
677698
if (d && empty) {
678699
Throwable ex = error;
679700
if (ex != null) {
680-
super.onError(ex);
701+
error(ex);
681702
} else {
682-
super.onComplete();
703+
complete();
683704
}
684705
return;
685706
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeCreate.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ public void onSuccess(T value) {
8484

8585
@Override
8686
public void onError(Throwable t) {
87+
if (!tryOnError(t)) {
88+
RxJavaPlugins.onError(t);
89+
}
90+
}
91+
92+
@Override
93+
public boolean tryOnError(Throwable t) {
8794
if (t == null) {
8895
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
8996
}
@@ -97,10 +104,10 @@ public void onError(Throwable t) {
97104
d.dispose();
98105
}
99106
}
100-
return;
107+
return true;
101108
}
102109
}
103-
RxJavaPlugins.onError(t);
110+
return false;
104111
}
105112

106113
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableCreate.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,13 @@ public void onNext(T t) {
7070

7171
@Override
7272
public void onError(Throwable t) {
73+
if (!tryOnError(t)) {
74+
RxJavaPlugins.onError(t);
75+
}
76+
}
77+
78+
@Override
79+
public boolean tryOnError(Throwable t) {
7380
if (t == null) {
7481
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
7582
}
@@ -79,9 +86,9 @@ public void onError(Throwable t) {
7986
} finally {
8087
dispose();
8188
}
82-
} else {
83-
RxJavaPlugins.onError(t);
89+
return true;
8490
}
91+
return false;
8592
}
8693

8794
@Override
@@ -174,19 +181,25 @@ public void onNext(T t) {
174181

175182
@Override
176183
public void onError(Throwable t) {
177-
if (emitter.isDisposed() || done) {
184+
if (!tryOnError(t)) {
178185
RxJavaPlugins.onError(t);
179-
return;
186+
}
187+
}
188+
189+
@Override
190+
public boolean tryOnError(Throwable t) {
191+
if (emitter.isDisposed() || done) {
192+
return false;
180193
}
181194
if (t == null) {
182195
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
183196
}
184197
if (error.addThrowable(t)) {
185198
done = true;
186199
drain();
187-
} else {
188-
RxJavaPlugins.onError(t);
200+
return true;
189201
}
202+
return false;
190203
}
191204

192205
@Override

0 commit comments

Comments
 (0)