Skip to content

Commit f9881df

Browse files
authored
2.x: fix doOnNext failure not triggering doOnError when fused (#5415)
* 2.x: fix doOnNext failure not triggering doOnError when fused * Remove whitespace
1 parent 84d333e commit f9881df

File tree

4 files changed

+263
-4
lines changed

4 files changed

+263
-4
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnEach.java

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.functions.*;
2222
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2323
import io.reactivex.internal.subscribers.*;
24+
import io.reactivex.internal.util.ExceptionHelper;
2425
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
@@ -149,11 +150,33 @@ public int requestFusion(int mode) {
149150
@Nullable
150151
@Override
151152
public T poll() throws Exception {
152-
T v = qs.poll();
153+
T v;
154+
155+
try {
156+
v = qs.poll();
157+
} catch (Throwable ex) {
158+
Exceptions.throwIfFatal(ex);
159+
try {
160+
onError.accept(ex);
161+
} catch (Throwable exc) {
162+
throw new CompositeException(ex, exc);
163+
}
164+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
165+
}
153166

154167
if (v != null) {
155168
try {
156-
onNext.accept(v);
169+
try {
170+
onNext.accept(v);
171+
} catch (Throwable ex) {
172+
Exceptions.throwIfFatal(ex);
173+
try {
174+
onError.accept(ex);
175+
} catch (Throwable exc) {
176+
throw new CompositeException(ex, exc);
177+
}
178+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
179+
}
157180
} finally {
158181
onAfterTerminate.run();
159182
}
@@ -282,11 +305,33 @@ public int requestFusion(int mode) {
282305
@Nullable
283306
@Override
284307
public T poll() throws Exception {
285-
T v = qs.poll();
308+
T v;
309+
310+
try {
311+
v = qs.poll();
312+
} catch (Throwable ex) {
313+
Exceptions.throwIfFatal(ex);
314+
try {
315+
onError.accept(ex);
316+
} catch (Throwable exc) {
317+
throw new CompositeException(ex, exc);
318+
}
319+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
320+
}
286321

287322
if (v != null) {
288323
try {
289-
onNext.accept(v);
324+
try {
325+
onNext.accept(v);
326+
} catch (Throwable ex) {
327+
Exceptions.throwIfFatal(ex);
328+
try {
329+
onError.accept(ex);
330+
} catch (Throwable exc) {
331+
throw new CompositeException(ex, exc);
332+
}
333+
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
334+
}
290335
} finally {
291336
onAfterTerminate.run();
292337
}

src/main/java/io/reactivex/internal/util/ExceptionHelper.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,21 @@ public static List<Throwable> flatten(Throwable t) {
106106
return list;
107107
}
108108

109+
/**
110+
* Workaround for Java 6 not supporting throwing a final Throwable from a catch block.
111+
* @param <E> the generic exception type
112+
* @param e the Throwable error to return or throw
113+
* @return the Throwable e if it is a subclass of Exception
114+
* @throws E the generic exception thrown
115+
*/
116+
@SuppressWarnings("unchecked")
117+
public static <E extends Throwable> Exception throwIfThrowable(Throwable e) throws E {
118+
if (e instanceof Exception) {
119+
return (Exception)e;
120+
}
121+
throw (E)e;
122+
}
123+
109124
static final class Termination extends Throwable {
110125

111126
private static final long serialVersionUID = -4649703670690200604L;

src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnEachTest.java

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import io.reactivex.*;
2828
import io.reactivex.exceptions.*;
29+
import io.reactivex.flowables.ConnectableFlowable;
2930
import io.reactivex.functions.*;
3031
import io.reactivex.internal.functions.Functions;
3132
import io.reactivex.internal.fuseable.*;
@@ -732,4 +733,197 @@ public Flowable<Object> apply(Flowable<Object> o) throws Exception {
732733
}
733734
});
734735
}
736+
737+
@Test
738+
public void doOnNextDoOnErrorFused() {
739+
ConnectableFlowable<Integer> co = Flowable.just(1)
740+
.doOnNext(new Consumer<Integer>() {
741+
@Override
742+
public void accept(Integer v) throws Exception {
743+
throw new TestException("First");
744+
}
745+
})
746+
.doOnError(new Consumer<Throwable>() {
747+
@Override
748+
public void accept(Throwable e) throws Exception {
749+
throw new TestException("Second");
750+
}
751+
})
752+
.publish();
753+
754+
TestSubscriber<Integer> ts = co.test();
755+
co.connect();
756+
757+
ts.assertFailure(CompositeException.class);
758+
759+
TestHelper.assertError(ts, 0, TestException.class, "First");
760+
TestHelper.assertError(ts, 1, TestException.class, "Second");
761+
}
762+
763+
@Test
764+
public void doOnNextDoOnErrorCombinedFused() {
765+
ConnectableFlowable<Integer> co = Flowable.just(1)
766+
.compose(new FlowableTransformer<Integer, Integer>() {
767+
@Override
768+
public Publisher<Integer> apply(Flowable<Integer> v) {
769+
return new FlowableDoOnEach<Integer>(v,
770+
new Consumer<Integer>() {
771+
@Override
772+
public void accept(Integer v) throws Exception {
773+
throw new TestException("First");
774+
}
775+
},
776+
new Consumer<Throwable>() {
777+
@Override
778+
public void accept(Throwable e) throws Exception {
779+
throw new TestException("Second");
780+
}
781+
},
782+
Functions.EMPTY_ACTION
783+
,
784+
Functions.EMPTY_ACTION
785+
);
786+
}
787+
})
788+
.publish();
789+
790+
TestSubscriber<Integer> ts = co.test();
791+
co.connect();
792+
793+
ts.assertFailure(CompositeException.class);
794+
795+
TestHelper.assertError(ts, 0, TestException.class, "First");
796+
TestHelper.assertError(ts, 1, TestException.class, "Second");
797+
}
798+
799+
@Test
800+
public void doOnNextDoOnErrorFused2() {
801+
ConnectableFlowable<Integer> co = Flowable.just(1)
802+
.doOnNext(new Consumer<Integer>() {
803+
@Override
804+
public void accept(Integer v) throws Exception {
805+
throw new TestException("First");
806+
}
807+
})
808+
.doOnError(new Consumer<Throwable>() {
809+
@Override
810+
public void accept(Throwable e) throws Exception {
811+
throw new TestException("Second");
812+
}
813+
})
814+
.doOnError(new Consumer<Throwable>() {
815+
@Override
816+
public void accept(Throwable e) throws Exception {
817+
throw new TestException("Third");
818+
}
819+
})
820+
.publish();
821+
822+
TestSubscriber<Integer> ts = co.test();
823+
co.connect();
824+
825+
ts.assertFailure(CompositeException.class);
826+
827+
TestHelper.assertError(ts, 0, TestException.class, "First");
828+
TestHelper.assertError(ts, 1, TestException.class, "Second");
829+
TestHelper.assertError(ts, 2, TestException.class, "Third");
830+
}
831+
832+
@Test
833+
public void doOnNextDoOnErrorFusedConditional() {
834+
ConnectableFlowable<Integer> co = Flowable.just(1)
835+
.doOnNext(new Consumer<Integer>() {
836+
@Override
837+
public void accept(Integer v) throws Exception {
838+
throw new TestException("First");
839+
}
840+
})
841+
.doOnError(new Consumer<Throwable>() {
842+
@Override
843+
public void accept(Throwable e) throws Exception {
844+
throw new TestException("Second");
845+
}
846+
})
847+
.filter(Functions.alwaysTrue())
848+
.publish();
849+
850+
TestSubscriber<Integer> ts = co.test();
851+
co.connect();
852+
853+
ts.assertFailure(CompositeException.class);
854+
855+
TestHelper.assertError(ts, 0, TestException.class, "First");
856+
TestHelper.assertError(ts, 1, TestException.class, "Second");
857+
}
858+
859+
@Test
860+
public void doOnNextDoOnErrorFusedConditional2() {
861+
ConnectableFlowable<Integer> co = Flowable.just(1)
862+
.doOnNext(new Consumer<Integer>() {
863+
@Override
864+
public void accept(Integer v) throws Exception {
865+
throw new TestException("First");
866+
}
867+
})
868+
.doOnError(new Consumer<Throwable>() {
869+
@Override
870+
public void accept(Throwable e) throws Exception {
871+
throw new TestException("Second");
872+
}
873+
})
874+
.doOnError(new Consumer<Throwable>() {
875+
@Override
876+
public void accept(Throwable e) throws Exception {
877+
throw new TestException("Third");
878+
}
879+
})
880+
.filter(Functions.alwaysTrue())
881+
.publish();
882+
883+
TestSubscriber<Integer> ts = co.test();
884+
co.connect();
885+
886+
ts.assertFailure(CompositeException.class);
887+
888+
TestHelper.assertError(ts, 0, TestException.class, "First");
889+
TestHelper.assertError(ts, 1, TestException.class, "Second");
890+
TestHelper.assertError(ts, 2, TestException.class, "Third");
891+
}
892+
893+
@Test
894+
public void doOnNextDoOnErrorCombinedFusedConditional() {
895+
ConnectableFlowable<Integer> co = Flowable.just(1)
896+
.compose(new FlowableTransformer<Integer, Integer>() {
897+
@Override
898+
public Publisher<Integer> apply(Flowable<Integer> v) {
899+
return new FlowableDoOnEach<Integer>(v,
900+
new Consumer<Integer>() {
901+
@Override
902+
public void accept(Integer v) throws Exception {
903+
throw new TestException("First");
904+
}
905+
},
906+
new Consumer<Throwable>() {
907+
@Override
908+
public void accept(Throwable e) throws Exception {
909+
throw new TestException("Second");
910+
}
911+
},
912+
Functions.EMPTY_ACTION
913+
,
914+
Functions.EMPTY_ACTION
915+
);
916+
}
917+
})
918+
.filter(Functions.alwaysTrue())
919+
.publish();
920+
921+
TestSubscriber<Integer> ts = co.test();
922+
co.connect();
923+
924+
ts.assertFailure(CompositeException.class);
925+
926+
TestHelper.assertError(ts, 0, TestException.class, "First");
927+
TestHelper.assertError(ts, 1, TestException.class, "Second");
928+
}
735929
}

src/test/java/io/reactivex/internal/util/ExceptionHelperTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ public void run() {
4646
TestHelper.race(r, r, Schedulers.single());
4747
}
4848
}
49+
50+
@Test(expected = InternalError.class)
51+
public void throwIfThrowable() throws Exception {
52+
ExceptionHelper.<Exception>throwIfThrowable(new InternalError());
53+
}
4954
}

0 commit comments

Comments
 (0)