17
17
18
18
import static org .junit .Assert .assertEquals ;
19
19
20
+ import java .util .*;
20
21
import java .util .concurrent .CountDownLatch ;
21
22
import java .util .concurrent .atomic .AtomicInteger ;
22
23
23
24
import org .junit .Test ;
24
25
25
- import rx .Producer ;
26
- import rx .Scheduler ;
26
+ import rx .*;
27
27
import rx .exceptions .MissingBackpressureException ;
28
28
import rx .functions .Action0 ;
29
29
import rx .observers .TestSubscriber ;
@@ -36,18 +36,25 @@ protected RxRingBuffer createRingBuffer() {
36
36
return new RxRingBuffer ();
37
37
}
38
38
39
+ @ Test (timeout = 20000 )
40
+ public void testConcurrencyLoop () throws InterruptedException {
41
+ for (int i = 0 ; i < 50 ; i ++) {
42
+ testConcurrency ();
43
+ }
44
+ }
45
+
39
46
/**
40
47
* Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
41
48
*/
42
- @ Test
49
+ @ Test ( timeout = 10000 )
43
50
public void testConcurrency () throws InterruptedException {
44
51
final RxRingBuffer b = createRingBuffer ();
45
- final CountDownLatch emitLatch = new CountDownLatch (255 );
46
- final CountDownLatch drainLatch = new CountDownLatch (2 );
52
+ final CountDownLatch emitLatch = new CountDownLatch (127 );
53
+ int drainers = 3 ;
54
+ final CountDownLatch drainLatch = new CountDownLatch (drainers );
47
55
48
56
final Scheduler .Worker w1 = Schedulers .newThread ().createWorker ();
49
- Scheduler .Worker w2 = Schedulers .newThread ().createWorker ();
50
- Scheduler .Worker w3 = Schedulers .newThread ().createWorker ();
57
+ List <Scheduler .Worker > drainerWorkers = new ArrayList <Scheduler .Worker >();
51
58
52
59
final AtomicInteger emit = new AtomicInteger ();
53
60
final AtomicInteger poll = new AtomicInteger ();
@@ -110,7 +117,12 @@ public void call() {
110
117
ts .requestMore (emitted );
111
118
emitted = 0 ;
112
119
} else {
113
- if (emitLatch .getCount () == 0 ) {
120
+ try {
121
+ Thread .sleep (1 );
122
+ } catch (InterruptedException ex ) {
123
+ // ignored
124
+ }
125
+ if (emitLatch .getCount () == 0 && b .isEmpty ()) {
114
126
// this works with SynchronizedQueue, if changing to a non-blocking Queue
115
127
// then this will likely need to change like the SpmcTest version
116
128
drainLatch .countDown ();
@@ -124,14 +136,18 @@ public void call() {
124
136
125
137
};
126
138
127
- w2 .schedule (drainer );
128
- w3 .schedule (drainer );
139
+ for (int i = 0 ; i < drainers ; i ++) {
140
+ Scheduler .Worker w = Schedulers .newThread ().createWorker ();
141
+ w .schedule (drainer );
142
+ drainerWorkers .add (w );
143
+ }
129
144
130
145
emitLatch .await ();
131
146
drainLatch .await ();
132
147
133
- w2 .unsubscribe ();
134
- w3 .unsubscribe ();
148
+ for (Scheduler .Worker w : drainerWorkers ) {
149
+ w .unsubscribe ();
150
+ }
135
151
w1 .unsubscribe (); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
136
152
137
153
System .out .println ("emit: " + emit .get () + " poll: " + poll .get ());
0 commit comments