23
23
24
24
import io .reactivex .exceptions .TestException ;
25
25
import io .reactivex .functions .Function ;
26
- import io .reactivex .observers .TestObserver ;
26
+ import io .reactivex .observers .* ;
27
27
import io .reactivex .plugins .RxJavaPlugins ;
28
28
import io .reactivex .schedulers .Schedulers ;
29
29
import io .reactivex .subscribers .TestSubscriber ;
30
30
31
31
public class XFlatMapTest {
32
32
33
33
@ Rule
34
- public Retry retry = new Retry (3 , 1000 , true );
34
+ public Retry retry = new Retry (5 , 1000 , true );
35
35
36
36
static final int SLEEP_AFTER_CANCEL = 500 ;
37
37
@@ -40,12 +40,23 @@ public class XFlatMapTest {
40
40
void sleep () throws Exception {
41
41
cb .await ();
42
42
try {
43
+ long before = System .currentTimeMillis ();
43
44
Thread .sleep (5000 );
45
+ throw new IllegalStateException ("Was not interrupted in time?! " + (System .currentTimeMillis () - before ));
44
46
} catch (InterruptedException ex ) {
45
47
// ignored here
46
48
}
47
49
}
48
50
51
+ void beforeCancelSleep (BaseTestConsumer <?, ?> ts ) throws Exception {
52
+ long before = System .currentTimeMillis ();
53
+ Thread .sleep (50 );
54
+ if (System .currentTimeMillis () - before > 100 ) {
55
+ ts .dispose ();
56
+ throw new IllegalStateException ("Overslept?" + (System .currentTimeMillis () - before ));
57
+ }
58
+ }
59
+
49
60
@ Test
50
61
public void flowableFlowable () throws Exception {
51
62
List <Throwable > errors = TestHelper .trackPluginErrors ();
@@ -63,7 +74,7 @@ public Publisher<Integer> apply(Integer v) throws Exception {
63
74
64
75
cb .await ();
65
76
66
- Thread . sleep ( 50 );
77
+ beforeCancelSleep ( ts );
67
78
68
79
ts .cancel ();
69
80
@@ -94,7 +105,7 @@ public Single<Integer> apply(Integer v) throws Exception {
94
105
95
106
cb .await ();
96
107
97
- Thread . sleep ( 50 );
108
+ beforeCancelSleep ( ts );
98
109
99
110
ts .cancel ();
100
111
@@ -125,7 +136,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
125
136
126
137
cb .await ();
127
138
128
- Thread . sleep ( 50 );
139
+ beforeCancelSleep ( ts );
129
140
130
141
ts .cancel ();
131
142
@@ -156,7 +167,7 @@ public Completable apply(Integer v) throws Exception {
156
167
157
168
cb .await ();
158
169
159
- Thread . sleep ( 50 );
170
+ beforeCancelSleep ( ts );
160
171
161
172
ts .cancel ();
162
173
@@ -188,7 +199,7 @@ public Completable apply(Integer v) throws Exception {
188
199
189
200
cb .await ();
190
201
191
- Thread . sleep ( 50 );
202
+ beforeCancelSleep ( ts );
192
203
193
204
ts .cancel ();
194
205
@@ -219,7 +230,7 @@ public Observable<Integer> apply(Integer v) throws Exception {
219
230
220
231
cb .await ();
221
232
222
- Thread . sleep ( 50 );
233
+ beforeCancelSleep ( ts );
223
234
224
235
ts .cancel ();
225
236
@@ -250,7 +261,7 @@ public Single<Integer> apply(Integer v) throws Exception {
250
261
251
262
cb .await ();
252
263
253
- Thread . sleep ( 50 );
264
+ beforeCancelSleep ( ts );
254
265
255
266
ts .cancel ();
256
267
@@ -281,7 +292,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
281
292
282
293
cb .await ();
283
294
284
- Thread . sleep ( 50 );
295
+ beforeCancelSleep ( ts );
285
296
286
297
ts .cancel ();
287
298
@@ -312,7 +323,7 @@ public Completable apply(Integer v) throws Exception {
312
323
313
324
cb .await ();
314
325
315
- Thread . sleep ( 50 );
326
+ beforeCancelSleep ( ts );
316
327
317
328
ts .cancel ();
318
329
@@ -344,7 +355,7 @@ public Completable apply(Integer v) throws Exception {
344
355
345
356
cb .await ();
346
357
347
- Thread . sleep ( 50 );
358
+ beforeCancelSleep ( ts );
348
359
349
360
ts .cancel ();
350
361
@@ -375,7 +386,7 @@ public Single<Integer> apply(Integer v) throws Exception {
375
386
376
387
cb .await ();
377
388
378
- Thread . sleep ( 50 );
389
+ beforeCancelSleep ( ts );
379
390
380
391
ts .cancel ();
381
392
@@ -406,7 +417,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
406
417
407
418
cb .await ();
408
419
409
- Thread . sleep ( 50 );
420
+ beforeCancelSleep ( ts );
410
421
411
422
ts .cancel ();
412
423
@@ -437,7 +448,7 @@ public Completable apply(Integer v) throws Exception {
437
448
438
449
cb .await ();
439
450
440
- Thread . sleep ( 50 );
451
+ beforeCancelSleep ( ts );
441
452
442
453
ts .cancel ();
443
454
@@ -469,7 +480,7 @@ public Completable apply(Integer v) throws Exception {
469
480
470
481
cb .await ();
471
482
472
- Thread . sleep ( 50 );
483
+ beforeCancelSleep ( ts );
473
484
474
485
ts .cancel ();
475
486
@@ -500,7 +511,7 @@ public Single<Integer> apply(Integer v) throws Exception {
500
511
501
512
cb .await ();
502
513
503
- Thread . sleep ( 50 );
514
+ beforeCancelSleep ( ts );
504
515
505
516
ts .cancel ();
506
517
@@ -531,7 +542,7 @@ public Maybe<Integer> apply(Integer v) throws Exception {
531
542
532
543
cb .await ();
533
544
534
- Thread . sleep ( 50 );
545
+ beforeCancelSleep ( ts );
535
546
536
547
ts .cancel ();
537
548
@@ -562,7 +573,7 @@ public Completable apply(Integer v) throws Exception {
562
573
563
574
cb .await ();
564
575
565
- Thread . sleep ( 50 );
576
+ beforeCancelSleep ( ts );
566
577
567
578
ts .cancel ();
568
579
@@ -594,7 +605,7 @@ public Completable apply(Integer v) throws Exception {
594
605
595
606
cb .await ();
596
607
597
- Thread . sleep ( 50 );
608
+ beforeCancelSleep ( ts );
598
609
599
610
ts .cancel ();
600
611
0 commit comments