Skip to content

Commit 3f973b0

Browse files
committed
Merge pull request #3331 from akarnokd/SchedulersTests2x
2.x: schedulers test of classic schedulers
2 parents 6b82350 + eddf715 commit 3f973b0

15 files changed

+2233
-14
lines changed

src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
9797
}
9898
return super.schedulePeriodicallyDirect(run, initialDelay, period, unit);
9999
}
100-
101-
static final class ExecutorWorker extends Scheduler.Worker implements Runnable {
100+
/* public: test support. */
101+
public static final class ExecutorWorker extends Scheduler.Worker implements Runnable {
102102
final Executor executor;
103103

104104
final MpscLinkedQueue<Runnable> queue;
@@ -109,6 +109,8 @@ static final class ExecutorWorker extends Scheduler.Worker implements Runnable {
109109
static final AtomicIntegerFieldUpdater<ExecutorWorker> WIP =
110110
AtomicIntegerFieldUpdater.newUpdater(ExecutorWorker.class, "wip");
111111

112+
final SetCompositeResource<Disposable> tasks = new SetCompositeResource<>(Disposable::dispose);
113+
112114
public ExecutorWorker(Executor executor) {
113115
this.executor = executor;
114116
this.queue = new MpscLinkedQueue<>();
@@ -153,26 +155,56 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
153155

154156
MultipleAssignmentResource<Disposable> mar = new MultipleAssignmentResource<>(Disposable::dispose, first);
155157

156-
Disposable delayed;
157-
158158
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
159159

160-
Runnable r = () -> mar.setResource(schedule(decoratedRun));
160+
ScheduledRunnable sr = new ScheduledRunnable(() -> {
161+
mar.setResource(schedule(decoratedRun));
162+
}, tasks);
163+
tasks.add(sr);
161164

162165
if (executor instanceof ScheduledExecutorService) {
163166
try {
164-
Future<?> f = ((ScheduledExecutorService)executor).schedule(r, delay, unit);
165-
delayed = () -> f.cancel(true);
167+
Future<?> f = ((ScheduledExecutorService)executor).schedule(sr, delay, unit);
168+
sr.setFuture(f);
166169
} catch (RejectedExecutionException ex) {
167170
disposed = true;
168171
RxJavaPlugins.onError(ex);
169172
return EmptyDisposable.INSTANCE;
170173
}
171174
} else {
172-
delayed = HELPER.scheduleDirect(r, delay, unit);
175+
Disposable d = HELPER.scheduleDirect(sr, delay, unit);
176+
sr.setFuture(new Future<Object>() {
177+
@Override
178+
public boolean cancel(boolean mayInterruptIfRunning) {
179+
d.dispose();
180+
return false;
181+
}
182+
183+
@Override
184+
public boolean isCancelled() {
185+
return false;
186+
}
187+
188+
@Override
189+
public boolean isDone() {
190+
return false;
191+
}
192+
193+
@Override
194+
public Object get() throws InterruptedException, ExecutionException {
195+
return null;
196+
}
197+
198+
@Override
199+
public Object get(long timeout, TimeUnit unit)
200+
throws InterruptedException, ExecutionException, TimeoutException {
201+
return null;
202+
}
203+
204+
});
173205
}
174206

175-
first.setResource(delayed);
207+
first.setResource(sr);
176208

177209
return mar;
178210
}
@@ -181,6 +213,7 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
181213
public void dispose() {
182214
if (!disposed) {
183215
disposed = true;
216+
tasks.dispose();
184217
if (WIP.getAndIncrement(this) == 0) {
185218
queue.clear();
186219
}

src/main/java/io/reactivex/schedulers/Schedulers.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,20 @@ public static Scheduler immediate() {
9393
public static Scheduler from(Executor executor) {
9494
return new ExecutorScheduler(executor);
9595
}
96+
97+
public static void shutdown() {
98+
computation().shutdown();
99+
io().shutdown();
100+
newThread().shutdown();
101+
single().shutdown();
102+
trampoline().shutdown();
103+
}
104+
105+
public static void start() {
106+
computation().start();
107+
io().start();
108+
newThread().start();
109+
single().start();
110+
trampoline().start();
111+
}
96112
}

src/test/java/io/reactivex/GroupByTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public void testTakeUnsubscribesOnGroupBy() {
2727
.groupBy(event -> event.type)
2828
.take(1)
2929
.toBlocking()
30-
.forEach(System.out::println);
30+
.forEach(v -> {
31+
System.out.println(v);
32+
v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
33+
});
3134

3235
System.out.println("**** finished");
3336
}

src/test/java/io/reactivex/internal/operators/OperatorConcatTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ public void testSimpleAsyncConcat() {
129129
inOrder.verify(observer, times(1)).onNext("six");
130130
}
131131

132+
@Test
133+
public void testNestedAsyncConcatLoop() throws Throwable {
134+
for (int i = 0; i < 100; i++) {
135+
System.out.println("testNestedAsyncConcat >> " + i);
136+
testNestedAsyncConcat();
137+
}
138+
}
139+
132140
/**
133141
* Test an async Observable that emits more async Observables
134142
*/
@@ -242,8 +250,8 @@ public void run() {
242250
inOrder.verify(observer, times(1)).onNext("eight");
243251
inOrder.verify(observer, times(1)).onNext("nine");
244252

245-
inOrder.verify(observer, times(1)).onComplete();
246253
verify(observer, never()).onError(any(Throwable.class));
254+
inOrder.verify(observer, times(1)).onComplete();
247255
}
248256

249257
@Test
@@ -469,7 +477,7 @@ public void cancel() {
469477
private final List<T> values;
470478
private Thread t = null;
471479
private int count = 0;
472-
private boolean subscribed = true;
480+
private volatile boolean subscribed = true;
473481
private final CountDownLatch once;
474482
private final CountDownLatch okToContinue;
475483
private final CountDownLatch threadHasStarted = new CountDownLatch(1);

src/test/java/io/reactivex/internal/operators/OperatorOnErrorResumeNextViaObservableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public String apply(String s) {
100100

101101
@Test
102102
@Ignore("Publishers should not throw")
103-
public void testResumeNextWithFailedOnSubscribe() {
103+
public void testResumeNextWithFailureOnSubscribe() {
104104
Observable<String> testObservable = Observable.create(new Publisher<String>() {
105105

106106
@Override
@@ -122,7 +122,7 @@ public void subscribe(Subscriber<? super String> t1) {
122122

123123
@Test
124124
@Ignore("Publishers should not throw")
125-
public void testResumeNextWithFailedOnSubscribeAsync() {
125+
public void testResumeNextWithFailureOnSubscribeAsync() {
126126
Observable<String> testObservable = Observable.create(new Publisher<String>() {
127127

128128
@Override

0 commit comments

Comments
 (0)