Skip to content

Commit b31112a

Browse files
committed
Merge pull request #2909 from akarnokd/RxRingBufferTestFix
Fix the drainer to check if the queue is empty before quitting.
2 parents a677856 + 53de3f2 commit b31112a

File tree

1 file changed

+28
-12
lines changed

1 file changed

+28
-12
lines changed

src/test/java/rx/internal/util/RxRingBufferWithoutUnsafeTest.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919

20+
import java.util.*;
2021
import java.util.concurrent.CountDownLatch;
2122
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.junit.Test;
2425

25-
import rx.Producer;
26-
import rx.Scheduler;
26+
import rx.*;
2727
import rx.exceptions.MissingBackpressureException;
2828
import rx.functions.Action0;
2929
import rx.observers.TestSubscriber;
@@ -36,18 +36,25 @@ protected RxRingBuffer createRingBuffer() {
3636
return new RxRingBuffer();
3737
}
3838

39+
@Test(timeout = 20000)
40+
public void testConcurrencyLoop() throws InterruptedException {
41+
for (int i = 0; i < 50; i++) {
42+
testConcurrency();
43+
}
44+
}
45+
3946
/**
4047
* Single producer, 2 consumers. The request() ensures it gets scheduled back on the same Producer thread.
4148
*/
42-
@Test
49+
@Test(timeout = 10000)
4350
public void testConcurrency() throws InterruptedException {
4451
final RxRingBuffer b = createRingBuffer();
45-
final CountDownLatch emitLatch = new CountDownLatch(255);
46-
final CountDownLatch drainLatch = new CountDownLatch(2);
52+
final CountDownLatch emitLatch = new CountDownLatch(127);
53+
int drainers = 3;
54+
final CountDownLatch drainLatch = new CountDownLatch(drainers);
4755

4856
final Scheduler.Worker w1 = Schedulers.newThread().createWorker();
49-
Scheduler.Worker w2 = Schedulers.newThread().createWorker();
50-
Scheduler.Worker w3 = Schedulers.newThread().createWorker();
57+
List<Scheduler.Worker> drainerWorkers = new ArrayList<Scheduler.Worker>();
5158

5259
final AtomicInteger emit = new AtomicInteger();
5360
final AtomicInteger poll = new AtomicInteger();
@@ -110,7 +117,12 @@ public void call() {
110117
ts.requestMore(emitted);
111118
emitted = 0;
112119
} else {
113-
if (emitLatch.getCount() == 0) {
120+
try {
121+
Thread.sleep(1);
122+
} catch (InterruptedException ex) {
123+
// ignored
124+
}
125+
if (emitLatch.getCount() == 0 && b.isEmpty()) {
114126
// this works with SynchronizedQueue, if changing to a non-blocking Queue
115127
// then this will likely need to change like the SpmcTest version
116128
drainLatch.countDown();
@@ -124,14 +136,18 @@ public void call() {
124136

125137
};
126138

127-
w2.schedule(drainer);
128-
w3.schedule(drainer);
139+
for (int i = 0; i < drainers; i++) {
140+
Scheduler.Worker w = Schedulers.newThread().createWorker();
141+
w.schedule(drainer);
142+
drainerWorkers.add(w);
143+
}
129144

130145
emitLatch.await();
131146
drainLatch.await();
132147

133-
w2.unsubscribe();
134-
w3.unsubscribe();
148+
for (Scheduler.Worker w : drainerWorkers) {
149+
w.unsubscribe();
150+
}
135151
w1.unsubscribe(); // put this one last as unsubscribing from it can cause Exceptions to be throw in w2/w3
136152

137153
System.out.println("emit: " + emit.get() + " poll: " + poll.get());

0 commit comments

Comments
 (0)