Skip to content

Add TCK tests for reactive streams implementations #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2018

Conversation

dagnir
Copy link
Contributor

@dagnir dagnir commented Jun 4, 2018

Add TCK tests for reactive streams implementations.

Builds off #407 from @ktoso.

Description

Add TCK tests for reactive streams implementations. This PR also adds a snippet in CONTRIBUTING.md to make it a requirement for all future PR's to include necessary TCK tests if it includes reactive streams implementations.

Motivation and Context

In order to ensure proper interoperability with other reactive streams libraries, we should ensure that our implementations of the interfaces follow the spec.

Testing

Add TCK tests for all current implementations.

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have read the README document
  • I have added tests to cover my changes
  • All new and existing tests passed
  • A short description of the change has been added to the CHANGELOG

License

  • I confirm that this pull request can be released under the Apache 2 license

@dagnir dagnir requested a review from zoewangg June 4, 2018 18:53
subscriber.onComplete();
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here 's' is Subscriber?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is wrong, this is the wording for 2.13:

Calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way for a Subscriber to signal failure is by cancelling its Subscription. In the case that this rule is violated, any associated Subscription to the Subscriber MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment.

I'll update it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And 1.9:

Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).

core/pom.xml Outdated
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just have the version in our dependency management like we do for other deps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -115,11 +115,35 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh how did we not have this before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was being pulled in transitively via the SPI:

mvn dependency:tree:

...
[INFO] +- software.amazon.awssdk:http-client-spi:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] |  +- software.amazon.awssdk:annotations:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
...


@Override
public void request(long n) {
if (n > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there opportunity to extract some of these checks into a decorator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be a little hard to do in a general way. For example like with the async file publisher, we need to synchronize the interactions with the subscriber but for this one we don't need to do that.

"by throwing an exception from onSubscribe.", ex)
// When onSubscribe fails this way, we don't know what state the
// s is thus calling onError may cause more crashes.
.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird. So we just log the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to log it but since onSubscribe must always return normally unless s is null, then we have to swallow it here.

Since 2.13 requires the Subscriber implementer to make onSubscribe return normally to be in spec, I don't think this will be a big issue.

readData();
return;
}
} else {
// Reached the end of the file, notify the subscriber and cleanup
subscriber.onComplete();
synchronized (subscriber) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be synchronized?

Copy link
Contributor Author

@dagnir dagnir Jun 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interactions with the subscriber need external synchrnozation. Without it, it's possible for a race condition between request(-1) triggering onError and this call to onComplete from the handler thread for example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any logic that would prevent both onError and onComplete from being called on the same subscriber? Even with synchronization that can still happen right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that can with this implementation but I think that's a separate issue.

The purpose of this synchronization is 1.3:

onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled in a thread-safe manner—and if performed by multiple threads—use external synchronization.

I think the rule you're referring to is 1.7:

Once a terminal state has been signaled (onError, onComplete) it is REQUIRED that no further signals occur.

We'd need additional state tracking to comply with 1.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So does the TCK not test the 1.7 requirement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess not. I'll go ahead and add some state tracking here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found these: https://github.com/reactive-streams/reactive-streams-jvm/blob/43fd6d18d45484575facf42b82f5e6bffe35a486/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java#L428-L447

But it looks like one is not implemented at the moment, and there's no test that checks signaling onError after onComplete or vice versa

}

private static class NoopSubscription implements Subscription {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be more useful in a util package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to util package

core/pom.xml Outdated
<dependency>
<groupId>com.google.jimfs</groupId>
<artifactId>jimfs</artifactId>
<version>1.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, can we have all versions defined in parent pom properties? it's easier to manage versions that way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

});
);
} catch (Throwable ex) {
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to log Subscriber here? the implementations might not override toString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay, it's just a diagnostic. Correct implementations should return normally from onSubscribe so it should be it should be rare that we hit this

@@ -45,26 +41,35 @@
/**
* File to read.
*/
private final File file;
private final Path file;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, can we rename it to path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public class SingleByteArrayAsyncRequestProviderTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {

public SingleByteArrayAsyncRequestProviderTckTest() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: formatting seems a bit off in this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -257,6 +260,17 @@
<artifactId>rxjava</artifactId>
<version>2.1.9</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the reactive-streams dependencies in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, this is a entry in our <dependencyManagement> block

}
}
}

private void signalOnNext(ByteBuffer bb) {
if (!done.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there still a race condition here if onComplete signals after we check done?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, It could happen for onError though if we signal for the non positive demand case. onComplete and onNext are only signaled within the same thread, so I don't think that could happen for that case.

Update so all signaling to synchronizes on this

Copy link
Contributor

@shorea shorea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making new commits! Makes it so much easier to review. Feel free to rebase away.

@ktoso
Copy link
Contributor

ktoso commented Jun 5, 2018

Cool, thanks for picking this up. I'm on vacation now-ish but will try to have a look soon :-)
I see you continued the work a bit on #519 :)

@dagnir
Copy link
Contributor Author

dagnir commented Jun 5, 2018

@ktoso Yup, I continued the work from #407 here in #519. Feel free to update #407 if you see anything that I might have missed!

@dagnir dagnir merged commit 12aaaae into aws:master Jun 5, 2018
dagnir added a commit to dagnir/aws-sdk-java-v2 that referenced this pull request Jun 7, 2018
@dagnir dagnir deleted the pr-407-dongie branch June 7, 2018 04:54
dagnir added a commit that referenced this pull request Jun 7, 2018
shorea pushed a commit that referenced this pull request Aug 3, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants