|
18 | 18 | import static org.junit.Assert.*;
|
19 | 19 |
|
20 | 20 | import java.util.*;
|
| 21 | +import java.util.concurrent.TimeUnit; |
21 | 22 |
|
22 | 23 | import org.junit.Test;
|
23 | 24 |
|
24 | 25 | import rx.*;
|
| 26 | +import rx.Observer; |
25 | 27 | import rx.Scheduler.Worker;
|
26 | 28 | import rx.Observable;
|
27 | 29 | import rx.functions.*;
|
| 30 | +import rx.observers.Observers; |
| 31 | +import rx.observers.TestSubscriber; |
28 | 32 | import rx.subscriptions.CompositeSubscription;
|
29 | 33 |
|
30 | 34 | public class TrampolineSchedulerTest extends AbstractSchedulerTests {
|
@@ -95,6 +99,47 @@ public void call() {
|
95 | 99 | }
|
96 | 100 | }
|
97 | 101 |
|
| 102 | + /** |
| 103 | + * This is a regression test for #1702. Concurrent work scheduling that is improperly synchronized can cause an |
| 104 | + * action to be added or removed onto the priority queue during a poll, which can result in NPEs during queue |
| 105 | + * sifting. While it is difficult to isolate the issue directly, we can easily trigger the behavior by spamming the |
| 106 | + * trampoline with enqueue requests from multiple threads concurrently. |
| 107 | + */ |
| 108 | + @Test |
| 109 | + public void testTrampolineWorkerHandlesConcurrentScheduling() { |
| 110 | + final Worker trampolineWorker = Schedulers.trampoline().createWorker(); |
| 111 | + final Observer<Subscription> observer = Observers.empty(); |
| 112 | + final TestSubscriber<Subscription> ts = new TestSubscriber<Subscription>(observer); |
| 113 | + |
| 114 | + // Spam the trampoline with actions. |
| 115 | + Observable.range(0, 50) |
| 116 | + .flatMap(new Func1<Integer, Observable<Subscription>>() { |
| 117 | + |
| 118 | + @Override |
| 119 | + public Observable<Subscription> call(Integer count) { |
| 120 | + return Observable.interval(1, TimeUnit.MICROSECONDS).map( |
| 121 | + new Func1<Long, Subscription>() { |
| 122 | + |
| 123 | + @Override |
| 124 | + public Subscription call(Long count) { |
| 125 | + return trampolineWorker.schedule(new Action0() { |
| 126 | + |
| 127 | + @Override |
| 128 | + public void call() {} |
| 129 | + |
| 130 | + }); |
| 131 | + } |
| 132 | + |
| 133 | + }).limit(100); |
| 134 | + } |
| 135 | + |
| 136 | + }) |
| 137 | + .subscribeOn(Schedulers.computation()) |
| 138 | + .subscribe(ts); |
| 139 | + ts.awaitTerminalEvent(); |
| 140 | + ts.assertNoErrors(); |
| 141 | + } |
| 142 | + |
98 | 143 | private static Worker doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
|
99 | 144 | Worker worker = Schedulers.trampoline().createWorker();
|
100 | 145 | worker.schedule(new Action0() {
|
|
0 commit comments