-
Notifications
You must be signed in to change notification settings - Fork 7.6k
1.x: Deprecate TestObserver, enhance TestSubscriber a bit #4011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@Experimental | ||
public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException { | ||
while (timeout != 0 && valueCount < expected) { | ||
unit.sleep(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of sleeping for 1 unit, why not using an exponential backoff capped at 1 unit.
This will ensure, fast await when the value is quickly ready, without wasting too much CPU resources.
i.e. something like that:
long start = System.nanoTime();
long timeoutNano = unit.convert(timeout, TimeUnit.NANOSECONDS);
long sleepPeriod = 1L;
while (valueCount < expected && System.nanoTime() - start < timeoutNano) {
TimeUnit.NANOSECONDS.sleep(sleepPeriod);
sleepPeriod = Math.min(sleepPeriod * 2, TimeUnit.NANOSECONDS.convert(1L, unit));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that sleep(1)
does not guarantee that it'll wake you up back in 1ms
, it may wake you later, so you need to explicitly check that expected timeout is not expired: System.nanoTime() - start < timeoutNano
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree. If you wake up with the correct number of values but
slightly late that shouldn't warrant being considered a timeout.
On Thu, Jun 16, 2016 at 1:24 PM Artem Zinnatullin [email protected]
wrote:
In src/main/java/rx/observers/TestSubscriber.java
#4011 (comment):} /**
\* Wait until the current committed value count is less than the expected amount
\* by sleeping 1 unit at most timeout times and return true if at least
\* the required amount of onNext values have been received.
\* @param expected the expected number of onNext events
\* @param timeout the time to wait for the events
\* @param unit the time unit of waiting
\* @return true if the expected number of onNext events happened
\* @throws InterruptedException if the sleep is interrupted
\* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
- @experimental
- public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException {
while (timeout != 0 && valueCount < expected) {
unit.sleep(1);
Another thing is that sleep(1) does not guarantee that it'll wake you up
back in 1ms, it may wake you later, so you need to explicitly check that
expected timeout is not expired: System.nanoTime() - start < timeoutNano—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/pull/4011/files/e273a739037cb9c3c935b54c654f1c7321321b91#r67387550,
or mute the thread
https://github.com/notifications/unsubscribe/AAEEEURMK6RcFoKC_tRVzW-8G3SLVrSyks5qMYbXgaJpZM4I3eci
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JakeWharton well, it depends on use case, I guess somebody who wants to use await(time)
on TestSubscriber
either wants it to be:
- "max timeout to fail the test instead of looping infinitely", like @timeout in JUnit. so in that case value will be someting like
1, MINUTE
while actually data will arrive much faster. - "precise value to check some concurrency algorithm with expected timeouts" so in that case "success" long (relatively) after actual timeout can be considered as a bug.
I'd be ok with something like this:
while (true) {
if (valueCount >= expected) {
return true;
}
if (System.nanoTime() - start > timeoutNano) {
return false;
}
Thread.sleep(1);
}
So that only flakiness of last sleep()
will be amortized.
Current implementation increments timeout
after each sleep(1)
and may collect some relatively big error, like you had 150 sleep(1)
but actually spent 200ms
which is ~25% error and seems possible in real life.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second case does not seem in scope for this class.
On Thu, Jun 16, 2016 at 1:55 PM Artem Zinnatullin [email protected]
wrote:
In src/main/java/rx/observers/TestSubscriber.java
#4011 (comment):} /**
\* Wait until the current committed value count is less than the expected amount
\* by sleeping 1 unit at most timeout times and return true if at least
\* the required amount of onNext values have been received.
\* @param expected the expected number of onNext events
\* @param timeout the time to wait for the events
\* @param unit the time unit of waiting
\* @return true if the expected number of onNext events happened
\* @throws InterruptedException if the sleep is interrupted
\* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
- @experimental
- public final boolean awaitValueCount(int expected, long timeout, TimeUnit unit) throws InterruptedException {
while (timeout != 0 && valueCount < expected) {
unit.sleep(1);
@JakeWharton https://github.com/JakeWharton well, it depends on use
case, I guess somebody who wants to use await(time) on TestSubscriber
either wants it to be:
- "max timeout to fail the test instead of looping infinitely", like
@timeout https://github.com/Timeout in JUnit. so in that case value
will be someting like 1, MINUTE while actually data will arrive much
faster.- "precise value to check some concurrency algorithm with expected
timeouts" so in that case "success" long (relatively) after actual timeout
can be considered as a bug.I'd be ok with something like this:
while (true) {
if (valueCount >= expected) {
return true;
}if (System.nanoTime() - start > timeoutNano) {
return false;
}Thread.sleep(1);
}So that only flakiness of last sleep() will be amortized.
Current implementation increments timeout after each sleep(1) and may
collect some relatively big error, like you had 150 sleep(1) but actually
spent 200ms which is ~25% error and seems possible in real life.—
You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub
https://github.com/ReactiveX/RxJava/pull/4011/files/e273a739037cb9c3c935b54c654f1c7321321b91#r67393194,
or mute the thread
https://github.com/notifications/unsubscribe/AAEEEcpBg2ZJw7tAC6x4p7y-o6BjHZBcks5qMY4ugaJpZM4I3eci
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout is there to not wait indefinitely. The sleep(1 unit) is there to not wait too much if the required number of values have been received. For example, testing if an 1M source observed on a scheduler and requested 500k delivers that amount within some reasonable time:
range(1, 1_000_000).observeOn(Schedulers.computation()).subscribe(ts);
ts.requestMOre(500_000);
ts.awaitValueCount(500_000, 5000, TimeUnit.MILLISECONDS);
On a good day, observeOn can stream 20M values per second, thus there would be 25 x 1ms sleep.
Other than that suggestion, LGTM 👍 |
Thanks for the feedback. I'm merging this as is and I'm open for PRs if you think |
This PR deprecates
TestObserver
in favor of the richerTestSubscriber
.In addition,
TestSubscriber
gets 3 new methods and 1 deprecation:getCompletions()
to return the onCompleted count as int instead of NotificationsgetValueCount()
returns the committed number of onNext events for thread-safe checking of values up to this countawaitValueCount
repeatedly sleeps up to a timeout and waits till the committed onNext count reaches/passes the expected amount.