Skip to content

Commit 498c152

Browse files
committed
1.x: fix SerializedObserverTest.testNotificationDelay
1 parent 3217017 commit 498c152

File tree

1 file changed

+44
-54
lines changed

1 file changed

+44
-54
lines changed

src/test/java/rx/observers/SerializedObserverTest.java

Lines changed: 44 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,17 @@
1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Mockito.*;
2121

22-
import java.util.Arrays;
22+
import java.util.*;
2323
import java.util.concurrent.*;
2424
import java.util.concurrent.atomic.*;
2525

2626
import org.junit.*;
2727
import org.mockito.*;
2828

2929
import rx.*;
30+
import rx.Observable;
3031
import rx.Observable.OnSubscribe;
32+
import rx.Observer;
3133
import rx.exceptions.TestException;
3234
import rx.schedulers.Schedulers;
3335

@@ -255,74 +257,62 @@ public void runConcurrencyTest() {
255257
*
256258
* @throws InterruptedException
257259
*/
258-
@Ignore
259-
// this is non-deterministic ... haven't figured out what's wrong with the test yet (benjchristensen: July 2014)
260260
@Test
261261
public void testNotificationDelay() throws InterruptedException {
262-
ExecutorService tp1 = Executors.newFixedThreadPool(1);
263-
ExecutorService tp2 = Executors.newFixedThreadPool(1);
262+
final ExecutorService tp1 = Executors.newFixedThreadPool(1);
264263
try {
265-
int n = 10;
264+
int n = 10000;
266265
for (int i = 0; i < n; i++) {
267-
final CountDownLatch firstOnNext = new CountDownLatch(1);
268-
final CountDownLatch onNextCount = new CountDownLatch(2);
269-
final CountDownLatch latch = new CountDownLatch(1);
270-
final CountDownLatch running = new CountDownLatch(2);
271-
272-
TestSubscriber<String> to = new TestSubscriber<String>(new Observer<String>() {
273-
266+
267+
@SuppressWarnings("unchecked")
268+
final Observer<Integer>[] os = new Observer[1];
269+
270+
final List<Thread> threads = new ArrayList<Thread>();
271+
272+
final Observer<Integer> o = new SerializedObserver<Integer>(new Observer<Integer>() {
273+
boolean first;
274274
@Override
275-
public void onCompleted() {
276-
275+
public void onNext(Integer t) {
276+
threads.add(Thread.currentThread());
277+
if (!first) {
278+
first = true;
279+
try {
280+
tp1.submit(new Runnable() {
281+
@Override
282+
public void run() {
283+
os[0].onNext(2);
284+
}
285+
}).get();
286+
} catch (InterruptedException e) {
287+
e.printStackTrace();
288+
} catch (ExecutionException e) {
289+
e.printStackTrace();
290+
}
291+
}
277292
}
278-
293+
279294
@Override
280-
public void onError(Throwable e) {
281-
295+
public void onError(Throwable e) {
296+
e.printStackTrace();
282297
}
283-
298+
284299
@Override
285-
public void onNext(String t) {
286-
firstOnNext.countDown();
287-
// force it to take time when delivering so the second one is enqueued
288-
try {
289-
latch.await();
290-
} catch (InterruptedException e) {
291-
}
300+
public void onCompleted() {
301+
292302
}
293-
294303
});
295-
Observer<String> o = serializedObserver(to);
296-
297-
Future<?> f1 = tp1.submit(new OnNextThread(o, 1, onNextCount, running));
298-
Future<?> f2 = tp2.submit(new OnNextThread(o, 1, onNextCount, running));
299-
300-
running.await(); // let one of the OnNextThread actually run before proceeding
301304

302-
firstOnNext.await();
303-
304-
Thread t1 = to.getLastSeenThread();
305-
System.out.println("first onNext on thread: " + t1);
306-
307-
latch.countDown();
308-
309-
waitOnThreads(f1, f2);
310-
// not completed yet
311-
312-
assertEquals(2, to.getOnNextEvents().size());
313-
314-
Thread t2 = to.getLastSeenThread();
315-
System.out.println("second onNext on thread: " + t2);
316-
317-
assertSame(t1, t2);
318-
319-
System.out.println(to.getOnNextEvents());
320-
o.onCompleted();
321-
System.out.println(to.getOnNextEvents());
305+
os[0] = o;
306+
307+
o.onNext(1);
308+
309+
System.out.println(threads);
310+
assertEquals(2, threads.size());
311+
312+
assertSame(threads.get(0), threads.get(1));
322313
}
323314
} finally {
324315
tp1.shutdown();
325-
tp2.shutdown();
326316
}
327317
}
328318

0 commit comments

Comments
 (0)