Skip to content

Fixed a non-deterministic test and a few scheduler leaks. #2837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/test/java/rx/BackpressureTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ public void testOnBackpressureDropWithAction() {
final AtomicInteger emitCount = new AtomicInteger();
final AtomicInteger dropCount = new AtomicInteger();
final AtomicInteger passCount = new AtomicInteger();
final int NUM = RxRingBuffer.SIZE * 3 / 2; // > 1 so that take doesn't prevent buffer overflow
final int NUM = RxRingBuffer.SIZE * 3; // > 1 so that take doesn't prevent buffer overflow
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
firehose(emitCount).onBackpressureDrop(new Action1<Integer>() {
@Override
Expand Down
72 changes: 37 additions & 35 deletions src/test/java/rx/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,43 +154,45 @@ public String call(String s) {
public final void testSequenceOfActions() throws InterruptedException {
final Scheduler scheduler = getScheduler();
final Scheduler.Worker inner = scheduler.createWorker();

final CountDownLatch latch = new CountDownLatch(2);
final Action0 first = mock(Action0.class);
final Action0 second = mock(Action0.class);

// make it wait until both the first and second are called
doAnswer(new Answer() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.getMock();
} finally {
latch.countDown();
try {
final CountDownLatch latch = new CountDownLatch(2);
final Action0 first = mock(Action0.class);
final Action0 second = mock(Action0.class);

// make it wait until both the first and second are called
doAnswer(new Answer() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.getMock();
} finally {
latch.countDown();
}
}
}
}).when(first).call();
doAnswer(new Answer() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.getMock();
} finally {
latch.countDown();
}).when(first).call();
doAnswer(new Answer() {

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
try {
return invocation.getMock();
} finally {
latch.countDown();
}
}
}
}).when(second).call();

inner.schedule(first);
inner.schedule(second);

latch.await();

verify(first, times(1)).call();
verify(second, times(1)).call();

}).when(second).call();

inner.schedule(first);
inner.schedule(second);

latch.await();

verify(first, times(1)).call();
verify(second, times(1)).call();
} finally {
inner.unsubscribe();
}
}

@Test
Expand Down
117 changes: 63 additions & 54 deletions src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -426,63 +426,72 @@ public void testOnErrorThrowsDoesntPreventDelivery2() {
public void testEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorSubject<Object> rs = BehaviorSubject.create();

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

worker.schedule(new Action0() {
@Override
public void call() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorSubject<Object> rs = BehaviorSubject.create();

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

worker.schedule(new Action0() {
@Override
public void call() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
rs.onCompleted();
rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
worker.schedule(new Action0() {
@Override
public void call() {
rs.onCompleted();
}
});
}
}
} finally {
worker.unsubscribe();
}
}

Expand Down
112 changes: 58 additions & 54 deletions src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,68 +342,72 @@ public void run() {
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

worker.schedule(new Action0() {
@Override
public void call() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
worker.schedule(new Action0() {
@Override
public void call() {
rs.onCompleted();
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
worker.schedule(new Action0() {
@Override
public void call() {
rs.onCompleted();
}
});
}
}
} finally {
worker.unsubscribe();
}
}
@Test(timeout = 5000)
Expand Down