Skip to content

Commit 0f2e883

Browse files
Merge pull request #1973 from akarnokd/ReplaySubjectFix1215
Fixed test issuing non-serialized messages to the subject.
2 parents da987ed + c148ea5 commit 0f2e883

File tree

1 file changed

+64
-54
lines changed

1 file changed

+64
-54
lines changed

src/test/java/rx/subjects/ReplaySubjectConcurrencyTest.java

Lines changed: 64 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -338,63 +338,73 @@ public void run() {
338338
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
339339
Scheduler s = Schedulers.io();
340340
Scheduler.Worker worker = Schedulers.io().createWorker();
341-
for (int i = 0; i < 50000; i++) {
342-
if (i % 1000 == 0) {
343-
System.out.println(i);
344-
}
345-
final ReplaySubject<Object> rs = ReplaySubject.create();
346-
347-
final CountDownLatch finish = new CountDownLatch(1);
348-
final CountDownLatch start = new CountDownLatch(1);
349-
350-
worker.schedule(new Action0() {
351-
@Override
352-
public void call() {
353-
try {
354-
start.await();
355-
} catch (Exception e1) {
356-
e1.printStackTrace();
357-
}
358-
rs.onNext(1);
359-
}
360-
});
361-
362-
final AtomicReference<Object> o = new AtomicReference<Object>();
363-
364-
rs.subscribeOn(s).observeOn(Schedulers.io())
365-
.subscribe(new Observer<Object>() {
366-
367-
@Override
368-
public void onCompleted() {
369-
o.set(-1);
370-
finish.countDown();
371-
}
372-
373-
@Override
374-
public void onError(Throwable e) {
375-
o.set(e);
376-
finish.countDown();
377-
}
378-
379-
@Override
380-
public void onNext(Object t) {
381-
o.set(t);
382-
finish.countDown();
341+
try {
342+
for (int i = 0; i < 50000; i++) {
343+
if (i % 1000 == 0) {
344+
System.out.println(i);
383345
}
346+
final ReplaySubject<Object> rs = ReplaySubject.create();
384347

385-
});
386-
start.countDown();
387-
388-
if (!finish.await(5, TimeUnit.SECONDS)) {
389-
System.out.println(o.get());
390-
System.out.println(rs.hasObservers());
391-
rs.onCompleted();
392-
Assert.fail("Timeout @ " + i);
393-
break;
394-
} else {
395-
Assert.assertEquals(1, o.get());
396-
rs.onCompleted();
348+
final CountDownLatch finish = new CountDownLatch(1);
349+
final CountDownLatch start = new CountDownLatch(1);
350+
351+
worker.schedule(new Action0() {
352+
@Override
353+
public void call() {
354+
try {
355+
start.await();
356+
} catch (Exception e1) {
357+
e1.printStackTrace();
358+
}
359+
rs.onNext(1);
360+
}
361+
});
362+
363+
final AtomicReference<Object> o = new AtomicReference<Object>();
364+
365+
rs.subscribeOn(s).observeOn(Schedulers.io())
366+
.subscribe(new Observer<Object>() {
367+
368+
@Override
369+
public void onCompleted() {
370+
o.set(-1);
371+
finish.countDown();
372+
}
373+
374+
@Override
375+
public void onError(Throwable e) {
376+
o.set(e);
377+
finish.countDown();
378+
}
379+
380+
@Override
381+
public void onNext(Object t) {
382+
o.set(t);
383+
finish.countDown();
384+
}
385+
386+
});
387+
start.countDown();
388+
389+
if (!finish.await(5, TimeUnit.SECONDS)) {
390+
System.out.println(o.get());
391+
System.out.println(rs.hasObservers());
392+
rs.onCompleted();
393+
Assert.fail("Timeout @ " + i);
394+
break;
395+
} else {
396+
Assert.assertEquals(1, o.get());
397+
worker.schedule(new Action0() {
398+
@Override
399+
public void call() {
400+
rs.onCompleted();
401+
}
402+
});
403+
404+
}
397405
}
406+
} finally {
407+
worker.unsubscribe();
398408
}
399409
}
400410
}

0 commit comments

Comments
 (0)