Skip to content

Commit 4d7eb2e

Browse files
Add unit tests for recursive scheduler usage
These tests came from @mairbek at ReactiveX#229 (comment)
1 parent 60fc9b5 commit 4d7eb2e

File tree

1 file changed

+115
-1
lines changed

1 file changed

+115
-1
lines changed

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@
1616
package rx.concurrency;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
1920

2021
import java.util.concurrent.CountDownLatch;
2122
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2224
import java.util.concurrent.atomic.AtomicInteger;
23-
import java.util.concurrent.atomic.AtomicReference;
2425

2526
import org.junit.Test;
2627

2728
import rx.Observable;
2829
import rx.Observer;
30+
import rx.Scheduler;
2931
import rx.Subscription;
32+
import rx.subscriptions.BooleanSubscription;
3033
import rx.subscriptions.Subscriptions;
3134
import rx.util.functions.Action1;
3235
import rx.util.functions.Func1;
36+
import rx.util.functions.Func2;
3337

3438
public class TestSchedulers {
3539

@@ -245,4 +249,114 @@ public void call(Integer t) {
245249
assertEquals(5, count.get());
246250
}
247251

252+
@Test
253+
public void testRecursiveScheduler1() {
254+
Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
255+
@Override
256+
public Subscription call(final Observer<Integer> observer) {
257+
return Schedulers.currentThread().schedule(0, new Func2<Scheduler, Integer, Subscription>() {
258+
@Override
259+
public Subscription call(Scheduler scheduler, Integer i) {
260+
if (i > 42) {
261+
observer.onCompleted();
262+
return Subscriptions.empty();
263+
}
264+
265+
observer.onNext(i);
266+
267+
return scheduler.schedule(i + 1, this);
268+
}
269+
});
270+
}
271+
});
272+
273+
final AtomicInteger lastValue = new AtomicInteger();
274+
obs.forEach(new Action1<Integer>() {
275+
276+
@Override
277+
public void call(Integer v) {
278+
System.out.println("Value: " + v);
279+
lastValue.set(v);
280+
}
281+
});
282+
283+
assertEquals(42, lastValue.get());
284+
}
285+
286+
@Test
287+
public void testRecursiveScheduler2() throws InterruptedException {
288+
// use latches instead of Thread.sleep
289+
final CountDownLatch latch = new CountDownLatch(10);
290+
final CountDownLatch completionLatch = new CountDownLatch(1);
291+
292+
Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
293+
@Override
294+
public Subscription call(final Observer<Integer> observer) {
295+
296+
return Schedulers.threadPoolForComputation().schedule(new BooleanSubscription(), new Func2<Scheduler, BooleanSubscription, Subscription>() {
297+
@Override
298+
public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
299+
if (cancel.isUnsubscribed()) {
300+
observer.onCompleted();
301+
completionLatch.countDown();
302+
return Subscriptions.empty();
303+
}
304+
305+
observer.onNext(42);
306+
latch.countDown();
307+
308+
try {
309+
Thread.sleep(1);
310+
} catch (InterruptedException e) {
311+
e.printStackTrace();
312+
}
313+
314+
scheduler.schedule(cancel, this);
315+
316+
return cancel;
317+
}
318+
});
319+
}
320+
});
321+
322+
@SuppressWarnings("unchecked")
323+
Observer<Integer> o = mock(Observer.class);
324+
325+
final AtomicInteger count = new AtomicInteger();
326+
final AtomicBoolean completed = new AtomicBoolean(false);
327+
Subscription subscribe = obs.subscribe(new Observer<Integer>() {
328+
@Override
329+
public void onCompleted() {
330+
System.out.println("Completed");
331+
completed.set(true);
332+
}
333+
334+
@Override
335+
public void onError(Exception e) {
336+
System.out.println("Error");
337+
}
338+
339+
@Override
340+
public void onNext(Integer args) {
341+
count.incrementAndGet();
342+
System.out.println(args);
343+
}
344+
});
345+
346+
if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
347+
fail("Timed out waiting on onNext latch");
348+
}
349+
350+
// now unsubscribe and ensure it stops the recursive loop
351+
subscribe.unsubscribe();
352+
System.out.println("unsubscribe");
353+
354+
if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) {
355+
fail("Timed out waiting on completion latch");
356+
}
357+
358+
assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic)
359+
assertTrue(completed.get());
360+
}
361+
248362
}

0 commit comments

Comments
 (0)