Skip to content

Commit 0572b89

Browse files
bcorneakarnokd
authored andcommitted
2.x: Add support for concurrently inserting actions while advancing time (#3648)
* 2.x: Adding support for concurrently inserting actions while advancing time * Removing the unit test proving the thread safety of a BlockingPriorityQueue.
1 parent 12490fd commit 0572b89

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

src/main/java/io/reactivex/schedulers/TestScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.schedulers;
1515

1616
import java.util.*;
17+
import java.util.concurrent.PriorityBlockingQueue;
1718
import java.util.concurrent.TimeUnit;
1819

1920
import io.reactivex.Scheduler;
@@ -28,7 +29,7 @@
2829
*/
2930
public final class TestScheduler extends Scheduler {
3031
/** The ordered queue for the runnable tasks. */
31-
private final Queue<TimedRunnable> queue = new PriorityQueue<TimedRunnable>(11);
32+
private final Queue<TimedRunnable> queue = new PriorityBlockingQueue<TimedRunnable>(11);
3233
/** The per-scheduler global order counter. */
3334
long counter;
3435

@@ -61,7 +62,7 @@ public int compareTo(TimedRunnable o) {
6162
}
6263

6364
// Storing time in nanoseconds internally.
64-
private long time;
65+
private volatile long time;
6566

6667
@Override
6768
public long now(TimeUnit unit) {

src/test/java/io/reactivex/schedulers/TestSchedulerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@
1414
package io.reactivex.schedulers;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.fail;
1718
import static org.mockito.Matchers.anyLong;
1819
import static org.mockito.Mockito.*;
1920

21+
import java.util.concurrent.BrokenBarrierException;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.CyclicBarrier;
2024
import java.util.concurrent.TimeUnit;
2125
import java.util.concurrent.atomic.AtomicInteger;
2226

@@ -216,4 +220,5 @@ public void run() {
216220
inner.dispose();
217221
}
218222
}
223+
219224
}

0 commit comments

Comments
 (0)