Skip to content

1.x: Deprecate TestObserver, enhance TestSubscriber a bit #4011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 22, 2016

Conversation

akarnokd
Copy link
Member

This PR deprecates TestObserver in favor of the richer TestSubscriber.

In addition, TestSubscriber gets 3 new methods and 1 deprecation:

  • getCompletions() to return the onCompleted count as int instead of Notifications
  • getValueCount() returns the committed number of onNext events for thread-safe checking of values up to this count
  • awaitValueCount repeatedly sleeps up to a timeout and waits till the committed onNext count reaches/passes the expected amount.

@Experimental
public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException {
while (timeout != 0 && valueCount < expected) {
unit.sleep(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of sleeping for 1 unit, why not using an exponential backoff capped at 1 unit.
This will ensure, fast await when the value is quickly ready, without wasting too much CPU resources.
i.e. something like that:

long start = System.nanoTime();
long timeoutNano = unit.convert(timeout, TimeUnit.NANOSECONDS);
long sleepPeriod = 1L;
while (valueCount < expected && System.nanoTime() - start < timeoutNano) {
    TimeUnit.NANOSECONDS.sleep(sleepPeriod);
    sleepPeriod = Math.min(sleepPeriod * 2, TimeUnit.NANOSECONDS.convert(1L, unit));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is that sleep(1) does not guarantee that it'll wake you up back in 1ms, it may wake you later, so you need to explicitly check that expected timeout is not expired: System.nanoTime() - start < timeoutNano

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree. If you wake up with the correct number of values but
slightly late that shouldn't warrant being considered a timeout.

On Thu, Jun 16, 2016 at 1:24 PM Artem Zinnatullin [email protected]
wrote:

In src/main/java/rx/observers/TestSubscriber.java
#4011 (comment):

 }

 /**
  • \* Wait until the current committed value count is less than the expected amount
    
  • \* by sleeping 1 unit at most timeout times and return true if at least
    
  • \* the required amount of onNext values have been received.
    
  • \* @param expected the expected number of onNext events
    
  • \* @param timeout the time to wait for the events
    
  • \* @param unit the time unit of waiting
    
  • \* @return true if the expected number of onNext events happened
    
  • \* @throws InterruptedException if the sleep is interrupted
    
  • \* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
    
  • */
    
  • @experimental
  • public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException {
  •    while (timeout != 0 && valueCount < expected) {
    
  •        unit.sleep(1);
    

Another thing is that sleep(1) does not guarantee that it'll wake you up
back in 1ms, it may wake you later, so you need to explicitly check that
expected timeout is not expired: System.nanoTime() - start < timeoutNano


You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/pull/4011/files/e273a739037cb9c3c935b54c654f1c7321321b91#r67387550,
or mute the thread
https://github.com/notifications/unsubscribe/AAEEEURMK6RcFoKC_tRVzW-8G3SLVrSyks5qMYbXgaJpZM4I3eci
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JakeWharton well, it depends on use case, I guess somebody who wants to use await(time) on TestSubscriber either wants it to be:

  • "max timeout to fail the test instead of looping infinitely", like @timeout in JUnit. so in that case value will be someting like 1, MINUTE while actually data will arrive much faster.
  • "precise value to check some concurrency algorithm with expected timeouts" so in that case "success" long (relatively) after actual timeout can be considered as a bug.

I'd be ok with something like this:

while (true) {
  if (valueCount >= expected) {
    return true;
  }

  if (System.nanoTime() - start > timeoutNano) {
    return false;
  }

  Thread.sleep(1);
}

So that only flakiness of last sleep() will be amortized.

Current implementation increments timeout after each sleep(1) and may collect some relatively big error, like you had 150 sleep(1) but actually spent 200ms which is ~25% error and seems possible in real life.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second case does not seem in scope for this class.

On Thu, Jun 16, 2016 at 1:55 PM Artem Zinnatullin [email protected]
wrote:

In src/main/java/rx/observers/TestSubscriber.java
#4011 (comment):

 }

 /**
  • \* Wait until the current committed value count is less than the expected amount
    
  • \* by sleeping 1 unit at most timeout times and return true if at least
    
  • \* the required amount of onNext values have been received.
    
  • \* @param expected the expected number of onNext events
    
  • \* @param timeout the time to wait for the events
    
  • \* @param unit the time unit of waiting
    
  • \* @return true if the expected number of onNext events happened
    
  • \* @throws InterruptedException if the sleep is interrupted
    
  • \* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
    
  • */
    
  • @experimental
  • public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException {
  •    while (timeout != 0 && valueCount < expected) {
    
  •        unit.sleep(1);
    

@JakeWharton https://github.com/JakeWharton well, it depends on use
case, I guess somebody who wants to use await(time) on TestSubscriber
either wants it to be:

  • "max timeout to fail the test instead of looping infinitely", like
    @timeout https://github.com/Timeout in JUnit. so in that case value
    will be someting like 1, MINUTE while actually data will arrive much
    faster.
  • "precise value to check some concurrency algorithm with expected
    timeouts" so in that case "success" long (relatively) after actual timeout
    can be considered as a bug.

I'd be ok with something like this:

while (true) {
if (valueCount >= expected) {
return true;
}

if (System.nanoTime() - start > timeoutNano) {
return false;
}

Thread.sleep(1);
}

So that only flakiness of last sleep() will be amortized.

Current implementation increments timeout after each sleep(1) and may
collect some relatively big error, like you had 150 sleep(1) but actually
spent 200ms which is ~25% error and seems possible in real life.


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/pull/4011/files/e273a739037cb9c3c935b54c654f1c7321321b91#r67393194,
or mute the thread
https://github.com/notifications/unsubscribe/AAEEEcpBg2ZJw7tAC6x4p7y-o6BjHZBcks5qMY4ugaJpZM4I3eci
.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is there to not wait indefinitely. The sleep(1 unit) is there to not wait too much if the required number of values have been received. For example, testing if an 1M source observed on a scheduler and requested 500k delivers that amount within some reasonable time:

range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(ts);

ts.requestMOre(500_000);

ts.awaitValueCount(500_000, 5000, TimeUnit.MILLISECONDS);

On a good day, observeOn can stream 20M values per second, thus there would be 25 x 1ms sleep.

@stevegury
Copy link
Member

Other than that suggestion, LGTM 👍

@akarnokd
Copy link
Member Author

Thanks for the feedback. I'm merging this as is and I'm open for PRs if you think awaitValueCount should use a different wait strategy.

@akarnokd akarnokd merged commit 54eeaca into ReactiveX:1.x Jun 22, 2016
@akarnokd akarnokd deleted the TestObserverDeprecate branch June 22, 2016 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants