Skip to content

1.x: Fix SyncOnSubscribeTest.testConcurrentRequests non-determinism #3536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 25, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/test/java/rx/observables/SyncOnSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,14 @@ public void testConcurrentRequests() throws InterruptedException {
final CountDownLatch l1 = new CountDownLatch(1);
final CountDownLatch l2 = new CountDownLatch(1);

@SuppressWarnings("unchecked")
Action1<? super Integer> onUnSubscribe = mock(Action1.class);
final CountDownLatch l3 = new CountDownLatch(1);

final Action1<Object> onUnSubscribe = new Action1<Object>() {
@Override
public void call(Object t) {
l3.countDown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous code checked that onUnsubscribe action was called only once, this one guarantees that action will be called at least once. Is this okay or you also want to check that action was called only once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detecting a second call is inherently racy with the checking of the first so this async test can't do both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay.

}
};

OnSubscribe<Integer> os = SyncOnSubscribe.createStateful(
new Func0<Integer>() {
Expand Down Expand Up @@ -532,7 +538,10 @@ public Integer call(Integer state, Observer<? super Integer> observer) {
inOrder.verify(o, times(finalCount)).onNext(any());
inOrder.verify(o, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
verify(onUnSubscribe, times(1)).call(any(Integer.class));

if (!l3.await(2, TimeUnit.SECONDS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Static import for SECONDS will make code little nicer!

fail("SyncOnSubscribe failed to countDown onUnSubscribe latch");
}
}

@Test
Expand Down