17
17
18
18
import static org .junit .Assert .*;
19
19
20
+ import java .util .concurrent .CountDownLatch ;
21
+ import java .util .concurrent .TimeUnit ;
20
22
import java .util .concurrent .atomic .AtomicInteger ;
23
+ import java .util .concurrent .atomic .AtomicReference ;
21
24
22
25
import org .junit .Test ;
23
26
24
27
import rx .Observable ;
28
+ import rx .Observer ;
29
+ import rx .Subscription ;
30
+ import rx .subscriptions .Subscriptions ;
25
31
import rx .util .functions .Action1 ;
26
32
import rx .util .functions .Func1 ;
27
33
@@ -181,7 +187,7 @@ public void call(String t) {
181
187
}
182
188
183
189
@ Test
184
- public void testSubscribeWithScheduler1 () {
190
+ public void testSubscribeWithScheduler1 () throws InterruptedException {
185
191
186
192
final AtomicInteger count = new AtomicInteger ();
187
193
@@ -204,16 +210,39 @@ public void call(Integer t) {
204
210
205
211
// now we'll subscribe with a scheduler and it should be async
206
212
213
+ final String currentThreadName = Thread .currentThread ().getName ();
214
+
215
+ // latches for deterministically controlling the test below across threads
216
+ final CountDownLatch latch = new CountDownLatch (5 );
217
+ final CountDownLatch first = new CountDownLatch (1 );
218
+
207
219
o1 .subscribe (new Action1 <Integer >() {
208
220
209
221
@ Override
210
222
public void call (Integer t ) {
223
+ try {
224
+ // we block the first one so we can assert this executes asynchronously with a count
225
+ first .await (1000 , TimeUnit .SECONDS );
226
+ } catch (InterruptedException e ) {
227
+ throw new RuntimeException ("The latch should have released if we are async." , e );
228
+ }
229
+ assertFalse (Thread .currentThread ().getName ().equals (currentThreadName ));
230
+ assertTrue (Thread .currentThread ().getName ().startsWith ("RxComputationThreadPool" ));
211
231
System .out .println ("Thread: " + Thread .currentThread ().getName ());
212
232
System .out .println ("t: " + t );
213
233
count .incrementAndGet ();
234
+ latch .countDown ();
214
235
}
215
236
}, Schedulers .threadPoolForComputation ());
216
237
238
+ // assert we are async
217
239
assertEquals (0 , count .get ());
240
+ // release the latch so it can go forward
241
+ first .countDown ();
242
+
243
+ // wait for all 5 responses
244
+ latch .await ();
245
+ assertEquals (5 , count .get ());
218
246
}
247
+
219
248
}
0 commit comments