Skip to content

Commit 4597784

Browse files
Fix non-deterministic unit test
1 parent 839942d commit 4597784

File tree

1 file changed

+30
-1
lines changed

1 file changed

+30
-1
lines changed

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@
1717

1818
import static org.junit.Assert.*;
1919

20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.atomic.AtomicInteger;
23+
import java.util.concurrent.atomic.AtomicReference;
2124

2225
import org.junit.Test;
2326

2427
import rx.Observable;
28+
import rx.Observer;
29+
import rx.Subscription;
30+
import rx.subscriptions.Subscriptions;
2531
import rx.util.functions.Action1;
2632
import rx.util.functions.Func1;
2733

@@ -181,7 +187,7 @@ public void call(String t) {
181187
}
182188

183189
@Test
184-
public void testSubscribeWithScheduler1() {
190+
public void testSubscribeWithScheduler1() throws InterruptedException {
185191

186192
final AtomicInteger count = new AtomicInteger();
187193

@@ -204,16 +210,39 @@ public void call(Integer t) {
204210

205211
// now we'll subscribe with a scheduler and it should be async
206212

213+
final String currentThreadName = Thread.currentThread().getName();
214+
215+
// latches for deterministically controlling the test below across threads
216+
final CountDownLatch latch = new CountDownLatch(5);
217+
final CountDownLatch first = new CountDownLatch(1);
218+
207219
o1.subscribe(new Action1<Integer>() {
208220

209221
@Override
210222
public void call(Integer t) {
223+
try {
224+
// we block the first one so we can assert this executes asynchronously with a count
225+
first.await(1000, TimeUnit.SECONDS);
226+
} catch (InterruptedException e) {
227+
throw new RuntimeException("The latch should have released if we are async.", e);
228+
}
229+
assertFalse(Thread.currentThread().getName().equals(currentThreadName));
230+
assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool"));
211231
System.out.println("Thread: " + Thread.currentThread().getName());
212232
System.out.println("t: " + t);
213233
count.incrementAndGet();
234+
latch.countDown();
214235
}
215236
}, Schedulers.threadPoolForComputation());
216237

238+
// assert we are async
217239
assertEquals(0, count.get());
240+
// release the latch so it can go forward
241+
first.countDown();
242+
243+
// wait for all 5 responses
244+
latch.await();
245+
assertEquals(5, count.get());
218246
}
247+
219248
}

0 commit comments

Comments
 (0)