-
Notifications
You must be signed in to change notification settings - Fork 916
Add TCK tests for reactive streams implementations #519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
subscriber.onComplete(); | ||
public void subscribe(Subscriber<? super ByteBuffer> s) { | ||
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null` | ||
if (s == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here 's' is Subscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment is wrong, this is the wording for 2.13:
Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.
I'll update it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And 1.9:
Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).
core/pom.xml
Outdated
<dependency> | ||
<groupId>org.reactivestreams</groupId> | ||
<artifactId>reactive-streams-tck</artifactId> | ||
<version>${reactive-streams.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just have the version in our dependency management like we do for other deps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
@@ -115,11 +115,35 @@ | |||
<artifactId>utils</artifactId> | |||
<version>${awsjavasdk.version}</version> | |||
</dependency> | |||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh how did we not have this before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it was being pulled in transitively via the SPI:
mvn dependency:tree
:
...
[INFO] +- software.amazon.awssdk:http-client-spi:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] | +- software.amazon.awssdk:annotations:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] | \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
...
|
||
@Override | ||
public void request(long n) { | ||
if (n > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there opportunity to extract some of these checks into a decorator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that might be a little hard to do in a general way. For example like with the async file publisher, we need to synchronize the interactions with the subscriber
but for this one we don't need to do that.
"by throwing an exception from onSubscribe.", ex) | ||
// When onSubscribe fails this way, we don't know what state the | ||
// s is thus calling onError may cause more crashes. | ||
.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems weird. So we just log the error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to log it but since onSubscribe
must always return normally unless s
is null, then we have to swallow it here.
Since 2.13 requires the Subscriber
implementer to make onSubscribe
return normally to be in spec, I don't think this will be a big issue.
readData(); | ||
return; | ||
} | ||
} else { | ||
// Reached the end of the file, notify the subscriber and cleanup | ||
subscriber.onComplete(); | ||
synchronized (subscriber) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interactions with the subscriber need external synchrnozation. Without it, it's possible for a race condition between request(-1)
triggering onError
and this call to onComplete
from the handler thread for example
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any logic that would prevent both onError and onComplete from being called on the same subscriber? Even with synchronization that can still happen right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think that can with this implementation but I think that's a separate issue.
The purpose of this synchronization is 1.3:
onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled in a thread-safe manner—and if performed by multiple threads—use external synchronization.
I think the rule you're referring to is 1.7:
Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.
We'd need additional state tracking to comply with 1.7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So does the TCK not test the 1.7 requirement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess not. I'll go ahead and add some state tracking here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it looks like one is not implemented at the moment, and there's no test that checks signaling onError
after onComplete
or vice versa
} | ||
|
||
private static class NoopSubscription implements Subscription { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be more useful in a util package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to util package
core/pom.xml
Outdated
<dependency> | ||
<groupId>com.google.jimfs</groupId> | ||
<artifactId>jimfs</artifactId> | ||
<version>1.1</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, can we have all versions defined in parent pom properties
? it's easier to manage versions that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
}); | ||
); | ||
} catch (Throwable ex) { | ||
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to log Subscriber
here? the implementations might not override toString
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's okay, it's just a diagnostic. Correct implementations should return normally from onSubscribe
so it should be it should be rare that we hit this
@@ -45,26 +41,35 @@ | |||
/** | |||
* File to read. | |||
*/ | |||
private final File file; | |||
private final Path file; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, can we rename it to path
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
public class SingleByteArrayAsyncRequestProviderTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> { | ||
|
||
public SingleByteArrayAsyncRequestProviderTckTest() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: formatting seems a bit off in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -257,6 +260,17 @@ | |||
<artifactId>rxjava</artifactId> | |||
<version>2.1.9</version> | |||
</dependency> | |||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need the reactive-streams dependencies in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean, this is a entry in our <dependencyManagement>
block
} | ||
} | ||
} | ||
|
||
private void signalOnNext(ByteBuffer bb) { | ||
if (!done.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there still a race condition here if onComplete signals after we check done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, It could happen for onError
though if we signal for the non positive demand case. onComplete
and onNext
are only signaled within the same thread, so I don't think that could happen for that case.
Update so all signaling to synchronizes on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making new commits! Makes it so much easier to review. Feel free to rebase away.
Cool, thanks for picking this up. I'm on vacation now-ish but will try to have a look soon :-) |
Regression from aws#519
Add TCK tests for reactive streams implementations.
Builds off #407 from @ktoso.
Description
Add TCK tests for reactive streams implementations. This PR also adds a snippet in
CONTRIBUTING.md
to make it a requirement for all future PR's to include necessary TCK tests if it includes reactive streams implementations.Motivation and Context
In order to ensure proper interoperability with other reactive streams libraries, we should ensure that our implementations of the interfaces follow the spec.
Testing
Add TCK tests for all current implementations.
Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsLicense