Skip to content

Commit 1330373

Browse files
nsk-mironovakarnokd
authored andcommitted
Observable/Flowable/Completable/Single.delay should always call onError on the provided Scheduler (#4522)
* Provide failing test case for Observable/Flowable/Completable/Single.delay * Call Observable/Flowable/Completable/Single onError on proper scheduler * Fix CompletableTest.delayErrorImmediately
1 parent 63c4451 commit 1330373

File tree

9 files changed

+139
-42
lines changed

9 files changed

+139
-42
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableDelay.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,12 @@ public void run() {
5757

5858
@Override
5959
public void onError(final Throwable e) {
60-
if (delayError) {
61-
set.add(scheduler.scheduleDirect(new Runnable() {
62-
@Override
63-
public void run() {
64-
s.onError(e);
65-
}
66-
}, delay, unit));
67-
} else {
68-
s.onError(e);
69-
}
60+
set.add(scheduler.scheduleDirect(new Runnable() {
61+
@Override
62+
public void run() {
63+
s.onError(e);
64+
}
65+
}, delayError ? delay : 0, unit));
7066
}
7167

7268
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowableDelay.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,16 @@ public void run() {
8888

8989
@Override
9090
public void onError(final Throwable t) {
91-
if (delayError) {
92-
w.schedule(new Runnable() {
93-
@Override
94-
public void run() {
95-
try {
96-
actual.onError(t);
97-
} finally {
98-
w.dispose();
99-
}
91+
w.schedule(new Runnable() {
92+
@Override
93+
public void run() {
94+
try {
95+
actual.onError(t);
96+
} finally {
97+
w.dispose();
10098
}
101-
}, delay, unit);
102-
} else {
103-
actual.onError(t);
104-
}
99+
}
100+
}, delayError ? delay : 0, unit);
105101
}
106102

107103
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,16 @@ public void run() {
8888

8989
@Override
9090
public void onError(final Throwable t) {
91-
if (delayError) {
92-
w.schedule(new Runnable() {
93-
@Override
94-
public void run() {
95-
try {
96-
actual.onError(t);
97-
} finally {
98-
w.dispose();
99-
}
91+
w.schedule(new Runnable() {
92+
@Override
93+
public void run() {
94+
try {
95+
actual.onError(t);
96+
} finally {
97+
w.dispose();
10098
}
101-
}, delay, unit);
102-
} else {
103-
actual.onError(t);
104-
}
99+
}
100+
}, delayError ? delay : 0, unit);
105101
}
106102

107103
@Override

src/main/java/io/reactivex/internal/operators/single/SingleDelay.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,13 @@ public void run() {
5656
}
5757

5858
@Override
59-
public void onError(Throwable e) {
60-
s.onError(e);
59+
public void onError(final Throwable e) {
60+
sd.replace(scheduler.scheduleDirect(new Runnable() {
61+
@Override
62+
public void run() {
63+
s.onError(e);
64+
}
65+
}, 0, unit));
6166
}
6267

6368
});

src/test/java/io/reactivex/completable/CompletableTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,8 @@ public void onComplete() {
16021602

16031603
@Test(timeout = 1000)
16041604
public void delayErrorImmediately() throws InterruptedException {
1605-
Completable c = error.completable.delay(250, TimeUnit.MILLISECONDS);
1605+
final TestScheduler scheduler = new TestScheduler();
1606+
final Completable c = error.completable.delay(250, TimeUnit.MILLISECONDS, scheduler);
16061607

16071608
final AtomicBoolean done = new AtomicBoolean();
16081609
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
@@ -1624,14 +1625,14 @@ public void onComplete() {
16241625
}
16251626
});
16261627

1628+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
1629+
16271630
Assert.assertTrue(error.get().toString(), error.get() instanceof TestException);
16281631
Assert.assertFalse("Already done", done.get());
16291632

1630-
Thread.sleep(100);
1633+
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
16311634

16321635
Assert.assertFalse("Already done", done.get());
1633-
1634-
Thread.sleep(200);
16351636
}
16361637

16371638
@Test(timeout = 1000)

src/test/java/io/reactivex/internal/operators/completable/CompletableDelayTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,18 @@
1313

1414
package io.reactivex.internal.operators.completable;
1515

16+
import java.util.concurrent.CountDownLatch;
1617
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.atomic.AtomicReference;
1719

20+
import io.reactivex.functions.Consumer;
1821
import org.junit.Test;
1922

2023
import io.reactivex.Completable;
2124
import io.reactivex.schedulers.Schedulers;
2225

26+
import static org.junit.Assert.assertNotEquals;
27+
2328
public class CompletableDelayTest {
2429

2530
@Test
@@ -30,4 +35,27 @@ public void delayCustomScheduler() {
3035
.test()
3136
.assertResult();
3237
}
38+
39+
@Test
40+
public void testOnErrorCalledOnScheduler() throws Exception {
41+
final CountDownLatch latch = new CountDownLatch(1);
42+
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
43+
44+
Completable.<String>error(new Exception())
45+
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
46+
.doOnError(new Consumer<Throwable>() {
47+
@Override
48+
public void accept(Throwable throwable) throws Exception {
49+
thread.set(Thread.currentThread());
50+
latch.countDown();
51+
}
52+
})
53+
.onErrorComplete()
54+
.subscribe();
55+
56+
latch.await();
57+
58+
assertNotEquals(Thread.currentThread(), thread.get());
59+
}
60+
3361
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertNotEquals;
1718
import static org.mockito.Matchers.*;
1819
import static org.mockito.Mockito.*;
1920

2021
import java.util.*;
2122
import java.util.concurrent.*;
2223
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicReference;
2325

2426
import org.junit.*;
2527
import org.mockito.InOrder;
@@ -911,4 +913,26 @@ public void testDelaySubscriptionDisposeBeforeTime() {
911913
verify(o, never()).onError(any(Throwable.class));
912914
}
913915

916+
@Test
917+
public void testOnErrorCalledOnScheduler() throws Exception {
918+
final CountDownLatch latch = new CountDownLatch(1);
919+
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
920+
921+
Flowable.<String>error(new Exception())
922+
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
923+
.doOnError(new Consumer<Throwable>() {
924+
@Override
925+
public void accept(Throwable throwable) throws Exception {
926+
thread.set(Thread.currentThread());
927+
latch.countDown();
928+
}
929+
})
930+
.onErrorResumeNext(Flowable.<String>empty())
931+
.subscribe();
932+
933+
latch.await();
934+
935+
assertNotEquals(Thread.currentThread(), thread.get());
936+
}
937+
914938
}

src/test/java/io/reactivex/internal/operators/observable/ObservableDelayTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertNotEquals;
1718
import static org.mockito.Matchers.*;
1819
import static org.mockito.Mockito.*;
1920

2021
import java.util.*;
2122
import java.util.concurrent.*;
23+
import java.util.concurrent.atomic.AtomicReference;
2224

2325
import org.junit.*;
2426
import org.mockito.InOrder;
@@ -858,4 +860,27 @@ public void delayWithTimeDelayError() throws Exception {
858860
.awaitDone(5, TimeUnit.SECONDS)
859861
.assertFailure(TestException.class, 1);
860862
}
863+
864+
@Test
865+
public void testOnErrorCalledOnScheduler() throws Exception {
866+
final CountDownLatch latch = new CountDownLatch(1);
867+
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
868+
869+
Observable.<String>error(new Exception())
870+
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
871+
.doOnError(new Consumer<Throwable>() {
872+
@Override
873+
public void accept(Throwable throwable) throws Exception {
874+
thread.set(Thread.currentThread());
875+
latch.countDown();
876+
}
877+
})
878+
.onErrorResumeNext(Observable.<String>empty())
879+
.subscribe();
880+
881+
latch.await();
882+
883+
assertNotEquals(Thread.currentThread(), thread.get());
884+
}
885+
861886
}

src/test/java/io/reactivex/internal/operators/single/SingleDelayTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
package io.reactivex.internal.operators.single;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertNotEquals;
1718

19+
import java.util.concurrent.CountDownLatch;
1820
import java.util.concurrent.TimeUnit;
1921
import java.util.concurrent.atomic.AtomicInteger;
22+
import java.util.concurrent.atomic.AtomicReference;
2023

24+
import io.reactivex.functions.Consumer;
2125
import org.junit.Test;
2226

2327
import io.reactivex.*;
@@ -103,4 +107,26 @@ public void delaySubscriptionTimeCustomScheduler() throws Exception {
103107
.assertResult(1);
104108
}
105109

110+
@Test
111+
public void testOnErrorCalledOnScheduler() throws Exception {
112+
final CountDownLatch latch = new CountDownLatch(1);
113+
final AtomicReference<Thread> thread = new AtomicReference<Thread>();
114+
115+
Single.<String>error(new Exception())
116+
.delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
117+
.doOnError(new Consumer<Throwable>() {
118+
@Override
119+
public void accept(Throwable throwable) throws Exception {
120+
thread.set(Thread.currentThread());
121+
latch.countDown();
122+
}
123+
})
124+
.onErrorResumeNext(Single.just(""))
125+
.subscribe();
126+
127+
latch.await();
128+
129+
assertNotEquals(Thread.currentThread(), thread.get());
130+
}
131+
106132
}

0 commit comments

Comments
 (0)