Skip to content

Add TCK tests for reactive streams implementations #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-61af032.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "AWS SDK for Java v2",
"type": "feature",
"description": "Incorporate the [Reactive Streams Technology Compatibility Kit](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck) and ensure current implementations are compliant. [#519](https://github.com/aws/aws-sdk-java-v2/issues/519)"
}
12 changes: 12 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ checklist below:
Commit the new file created by the script in `.changes/next-release` with
your changes.

__Note__: Some changes have additional requirements. Refer to the section below
to see if your change will require additional work to be accepted.

#### Additional Pull Request Requirements
##### Reactive Streams
If the change includes implementations of the [Reactive Streams
interfaces](https://github.com/reactive-streams/reactive-streams-jvm), the
change must also contain verification tests using the [Reactive Streams
Technology Compatibility
Kit](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck)
to ensure specificiation compliance.

### Getting Your Pull Request Merged
All Pull Requests must be approved by at least one member of the SDK team
before it can be merged in. The members only have limited bandwitdth to review
Expand Down
21 changes: 21 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,32 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh how did we not have this before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it was being pulled in transitively via the SPI:

mvn dependency:tree:

...
[INFO] +- software.amazon.awssdk:http-client-spi:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] |  +- software.amazon.awssdk:annotations:jar:2.0.0-preview-11-SNAPSHOT:compile
[INFO] |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
...

<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>test-utils</artifactId>
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.jimfs</groupId>
<artifactId>jimfs</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;

/**
* Callback interface to handle a streaming asynchronous response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
package software.amazon.awssdk.core.async;

import java.nio.ByteBuffer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import software.amazon.awssdk.annotations.SdkInternalApi;

/**
Expand All @@ -43,20 +45,42 @@ public long contentLength() {
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
subscriber.onSubscribe(
new Subscription() {
@Override
public void request(long n) {
if (n > 0) {
subscriber.onNext(ByteBuffer.wrap(bytes));
subscriber.onComplete();
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
if (s == null) {
throw new NullPointerException("Subscription MUST NOT be null.");
}

// As per 2.13, this method must return normally (i.e. not throw).
try {
s.onSubscribe(
new Subscription() {
boolean done = false;

@Override
public void request(long n) {
if (n > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there opportunity to extract some of these checks into a decorator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that might be a little hard to do in a general way. For example like with the async file publisher, we need to synchronize the interactions with the subscriber but for this one we don't need to do that.

if (!done) {
s.onNext(ByteBuffer.wrap(bytes));
done = true;
s.onComplete();
}
} else {
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
}
}
}

@Override
public void cancel() {
@Override
public void cancel() {
}
}
});
);
} catch (Throwable ex) {
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to log Subscriber here? the implementations might not override toString

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay, it's just a diagnostic. Correct implementations should return normally from onSubscribe so it should be it should be rare that we hit this

"by throwing an exception from onSubscribe.", ex)
// When onSubscribe fails this way, we don't know what state the
// s is thus calling onError may cause more crashes.
.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird. So we just log the error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to log it but since onSubscribe must always return normally unless s is null, then we have to swallow it here.

Since 2.13 requires the Subscriber implementer to make onSubscribe return normally to be in spec, I don't think this will be a big issue.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@

package software.amazon.awssdk.core.async;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.util.async.NoopSubscription;
import software.amazon.awssdk.utils.builder.SdkBuilder;

/**
Expand All @@ -45,26 +42,42 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
/**
* File to read.
*/
private final File file;
private final Path path;

/**
* Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
*/
private final int chunkSizeInBytes;


private FileAsyncRequestBody(DefaultBuilder builder) {
this.file = builder.path.toFile();
this.path = builder.path;
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
}

@Override
public long contentLength() {
return file.length();
return path.toFile().length();
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
s.onSubscribe(new FileSubscription(file, s, chunkSizeInBytes));
try {
AsynchronousFileChannel channel = openInputChannel(this.path);

// We need to synchronize here because the subscriber could call
// request() from within onSubscribe which would potentially
// trigger onNext before onSubscribe is finished.
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
synchronized (subscription) {
s.onSubscribe(subscription);
}
} catch (IOException e) {
// subscribe() must return normally, so we need to signal the
// failure to open via onError() once onSubscribe() is signaled.
s.onSubscribe(new NoopSubscription(s));
s.onError(e);
}
}

/**
Expand Down Expand Up @@ -136,74 +149,99 @@ public FileAsyncRequestBody build() {
* Reads the file for one subscriber.
*/
private static class FileSubscription implements Subscription {

private final AsynchronousFileChannel inputChannel;
private final Subscriber<? super ByteBuffer> subscriber;
private final int chunkSize;

private long position = 0;
private AtomicLong outstandingRequests = new AtomicLong(0);
private AtomicLong outstandingDemand = new AtomicLong(0);
private boolean writeInProgress = false;
private volatile boolean done = false;

private FileSubscription(File file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
this.inputChannel = openInputChannel(file);
private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
this.inputChannel = inputChannel;
this.subscriber = subscriber;
this.chunkSize = chunkSize;
}

@Override
public void request(long n) {
try {
outstandingRequests.addAndGet(n);
if (done) {
return;
}

if (n < 1) {
IllegalArgumentException ex =
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a "
+ "non-positive number of elements.");
signalOnError(ex);
} else {
try {
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as
// "effectively unbounded"
outstandingDemand.getAndUpdate(initialDemand -> {
if (Long.MAX_VALUE - initialDemand < n) {
return Long.MAX_VALUE;
} else {
return initialDemand + n;
}
});

synchronized (this) {
if (!writeInProgress) {
writeInProgress = true;
readData();
synchronized (this) {
if (!writeInProgress) {
writeInProgress = true;
readData();
}
}
} catch (Exception e) {
signalOnError(e);
}
} catch (Exception e) {
subscriber.onError(e);
}
}

@Override
public void cancel() {
closeFile();
synchronized (this) {
if (!done) {
done = true;
closeFile();
}
}
}

private void readData() {
// It's possible to have another request for data come in after we've closed the file.
if (!inputChannel.isOpen()) {
return;
}

final ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
inputChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
if (result > 0) {
attachment.flip();
position += attachment.remaining();
subscriber.onNext(attachment);
signalOnNext(attachment);
// If we have more permits, queue up another read.
if (outstandingRequests.decrementAndGet() > 0) {
if (outstandingDemand.decrementAndGet() > 0) {
readData();
return;
}
} else {
// Reached the end of the file, notify the subscriber and cleanup
subscriber.onComplete();
signalOnComplete();
closeFile();
}

synchronized (FileSubscription.this) {
synchronized (this) {
writeInProgress = false;
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
subscriber.onError(exc);
signalOnError(exc);
closeFile();
}
});
Expand All @@ -213,23 +251,38 @@ private void closeFile() {
try {
inputChannel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
signalOnError(e);
}
}

}
private void signalOnNext(ByteBuffer bb) {
synchronized (this) {
if (!done) {
subscriber.onNext(bb);
}
}
}

private static AsynchronousFileChannel openInputChannel(File file) {
try {
final Path path = Paths.get(file.getAbsolutePath());
if (!Files.exists(path)) {
Files.createFile(path);
private void signalOnComplete() {
synchronized (this) {
if (!done) {
subscriber.onComplete();
done = true;
}
}
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void signalOnError(Throwable t) {
synchronized (this) {
if (!done) {
subscriber.onError(t);
done = true;
}
}
}
}

private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
}
}
Loading