Skip to content

Commit 4fbf11a

Browse files
committed
Merge pull request #2471 from jnlopar/fix1702
Fixes NPEs reported in #1702 by synchronizing queue.
2 parents cf5ae70 + 63da8b1 commit 4fbf11a

File tree

2 files changed

+57
-17
lines changed

2 files changed

+57
-17
lines changed

src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2014 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.PriorityQueue;
18+
import java.util.concurrent.PriorityBlockingQueue;
1919
import java.util.concurrent.TimeUnit;
2020
import java.util.concurrent.atomic.AtomicInteger;
2121
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -45,12 +45,11 @@ public Worker createWorker() {
4545
/* package accessible for unit tests */TrampolineScheduler() {
4646
}
4747

48-
volatile int counter;
49-
static final AtomicIntegerFieldUpdater<TrampolineScheduler> COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TrampolineScheduler.class, "counter");
48+
private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {
5049

51-
private class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription {
52-
53-
final PriorityQueue<TimedAction> queue = new PriorityQueue<TimedAction>();
50+
private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerCurrentThreadScheduler.class, "counter");
51+
volatile int counter;
52+
private final PriorityBlockingQueue<TimedAction> queue = new PriorityBlockingQueue<TimedAction>();
5453
private final BooleanSubscription innerSubscription = new BooleanSubscription();
5554
private final AtomicInteger wip = new AtomicInteger();
5655

@@ -70,13 +69,12 @@ private Subscription enqueue(Action0 action, long execTime) {
7069
if (innerSubscription.isUnsubscribed()) {
7170
return Subscriptions.unsubscribed();
7271
}
73-
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
72+
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(this));
7473
queue.add(timedAction);
7574

7675
if (wip.getAndIncrement() == 0) {
7776
do {
78-
TimedAction polled = queue.poll();
79-
// check for null as it could have been unsubscribed and removed
77+
final TimedAction polled = queue.poll();
8078
if (polled != null) {
8179
polled.action.call();
8280
}
@@ -88,10 +86,7 @@ private Subscription enqueue(Action0 action, long execTime) {
8886

8987
@Override
9088
public void call() {
91-
PriorityQueue<TimedAction> _q = queue;
92-
if (_q != null) {
93-
_q.remove(timedAction);
94-
}
89+
queue.remove(timedAction);
9590
}
9691

9792
});
@@ -130,7 +125,7 @@ public int compareTo(TimedAction that) {
130125
return result;
131126
}
132127
}
133-
128+
134129
// because I can't use Integer.compare from Java 7
135130
private static int compare(int x, int y) {
136131
return (x < y) ? -1 : ((x == y) ? 0 : 1);

src/test/java/rx/schedulers/TrampolineSchedulerTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.*;
21+
import java.util.concurrent.TimeUnit;
2122

2223
import org.junit.Test;
2324

2425
import rx.*;
26+
import rx.Observer;
2527
import rx.Scheduler.Worker;
2628
import rx.Observable;
2729
import rx.functions.*;
30+
import rx.observers.Observers;
31+
import rx.observers.TestSubscriber;
2832
import rx.subscriptions.CompositeSubscription;
2933

3034
public class TrampolineSchedulerTest extends AbstractSchedulerTests {
@@ -95,6 +99,47 @@ public void call() {
9599
}
96100
}
97101

102+
/**
103+
* This is a regression test for #1702. Concurrent work scheduling that is improperly synchronized can cause an
104+
* action to be added or removed onto the priority queue during a poll, which can result in NPEs during queue
105+
* sifting. While it is difficult to isolate the issue directly, we can easily trigger the behavior by spamming the
106+
* trampoline with enqueue requests from multiple threads concurrently.
107+
*/
108+
@Test
109+
public void testTrampolineWorkerHandlesConcurrentScheduling() {
110+
final Worker trampolineWorker = Schedulers.trampoline().createWorker();
111+
final Observer<Subscription> observer = Observers.empty();
112+
final TestSubscriber<Subscription> ts = new TestSubscriber<Subscription>(observer);
113+
114+
// Spam the trampoline with actions.
115+
Observable.range(0, 50)
116+
.flatMap(new Func1<Integer, Observable<Subscription>>() {
117+
118+
@Override
119+
public Observable<Subscription> call(Integer count) {
120+
return Observable.interval(1, TimeUnit.MICROSECONDS).map(
121+
new Func1<Long, Subscription>() {
122+
123+
@Override
124+
public Subscription call(Long count) {
125+
return trampolineWorker.schedule(new Action0() {
126+
127+
@Override
128+
public void call() {}
129+
130+
});
131+
}
132+
133+
}).limit(100);
134+
}
135+
136+
})
137+
.subscribeOn(Schedulers.computation())
138+
.subscribe(ts);
139+
ts.awaitTerminalEvent();
140+
ts.assertNoErrors();
141+
}
142+
98143
private static Worker doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
99144
Worker worker = Schedulers.trampoline().createWorker();
100145
worker.schedule(new Action0() {

0 commit comments

Comments
 (0)