15
15
*/
16
16
package rx .internal .operators ;
17
17
18
- import static org .junit .Assert .assertEquals ;
19
- import static org .junit .Assert .fail ;
18
+ import static org .junit .Assert .*;
20
19
import static org .mockito .Matchers .any ;
21
20
import static org .mockito .Mockito .*;
22
21
23
- import java .util .concurrent .CountDownLatch ;
24
- import java .util .concurrent .ExecutorService ;
25
- import java .util .concurrent .Executors ;
26
- import java .util .concurrent .TimeUnit ;
27
- import java .util .concurrent .atomic .AtomicBoolean ;
28
- import java .util .concurrent .atomic .AtomicInteger ;
29
- import java .util .concurrent .atomic .AtomicLong ;
22
+ import java .util .*;
23
+ import java .util .concurrent .*;
24
+ import java .util .concurrent .atomic .*;
30
25
31
26
import org .junit .Test ;
32
- import org .mockito .InOrder ;
33
- import org .mockito .Mockito ;
27
+ import org .mockito .*;
34
28
35
- import rx .Observable ;
29
+ import rx .* ;
36
30
import rx .Observable .OnSubscribe ;
31
+ import rx .Observable ;
37
32
import rx .Observer ;
38
- import rx .Producer ;
39
- import rx .Subscriber ;
40
- import rx .Subscription ;
41
- import rx .functions .Action0 ;
42
- import rx .functions .Action1 ;
43
- import rx .functions .Func1 ;
44
- import rx .functions .Func2 ;
33
+ import rx .functions .*;
45
34
import rx .internal .util .RxRingBuffer ;
46
35
import rx .observables .GroupedObservable ;
47
36
import rx .observers .TestSubscriber ;
@@ -722,7 +711,10 @@ public void testRetryWithBackpressureParallel() throws InterruptedException {
722
711
int ncpu = Runtime .getRuntime ().availableProcessors ();
723
712
ExecutorService exec = Executors .newFixedThreadPool (Math .max (ncpu / 2 , 1 ));
724
713
final AtomicInteger timeouts = new AtomicInteger ();
725
- final AtomicInteger data = new AtomicInteger ();
714
+ final Map <Integer , List <String >> data = new ConcurrentHashMap <Integer , List <String >>();
715
+ final Map <Integer , List <Throwable >> exceptions = new ConcurrentHashMap <Integer , List <Throwable >>();
716
+ final Map <Integer , Integer > completions = new ConcurrentHashMap <Integer , Integer >();
717
+
726
718
int m = 2000 ;
727
719
final CountDownLatch cdl = new CountDownLatch (m );
728
720
for (int i = 0 ; i < m ; i ++) {
@@ -737,13 +729,13 @@ public void run() {
737
729
origin .retry ().observeOn (Schedulers .computation ()).unsafeSubscribe (ts );
738
730
ts .awaitTerminalEvent (2 , TimeUnit .SECONDS );
739
731
if (ts .getOnNextEvents ().size () != NUM_RETRIES + 2 ) {
740
- data .incrementAndGet ( );
732
+ data .put ( j , ts . getOnNextEvents () );
741
733
}
742
734
if (ts .getOnErrorEvents ().size () != 0 ) {
743
- data . incrementAndGet ( );
735
+ exceptions . put ( j , ts . getOnErrorEvents () );
744
736
}
745
737
if (ts .getOnCompletedEvents ().size () != 1 ) {
746
- data . incrementAndGet ( );
738
+ completions . put ( j , ts . getOnCompletedEvents (). size () );
747
739
}
748
740
} catch (Throwable t ) {
749
741
timeouts .incrementAndGet ();
@@ -756,8 +748,15 @@ public void run() {
756
748
exec .shutdown ();
757
749
cdl .await ();
758
750
assertEquals (0 , timeouts .get ());
759
- assertEquals (0 , data .get ());
760
-
751
+ if (data .size () > 0 ) {
752
+ fail ("Data content mismatch: " + data );
753
+ }
754
+ if (exceptions .size () > 0 ) {
755
+ fail ("Exceptions received: " + exceptions );
756
+ }
757
+ if (completions .size () > 0 ) {
758
+ fail ("Multiple completions received: " + completions );
759
+ }
761
760
}
762
761
@ Test (timeout = 3000 )
763
762
public void testIssue1900 () throws InterruptedException {
0 commit comments