Skip to content

[PR for discussion] Showing Reactive Streams TCK, proposing some #407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>${reactive-streams.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting that you had to explicitly pull this in - was this required or was it just a best practise thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly just since I wanted to be on 1.0.2 and I think the Netty adapter library is on previous version. I will double check. It is backwards compatible, so we can use a newer version here (both semantically and binary)

</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand All @@ -129,6 +135,18 @@
<version>${awsjavasdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>${reactive-streams.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.jimfs</groupId>
<artifactId>jimfs</artifactId>
<version>1.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public byte[] complete() {
/**
* Requests chunks sequentially and dumps them into a {@link ByteArrayOutputStream}.
*/
// TODO cover with Reactive Streams TCK, it's mostly ok, just a few edge cases / sanity checks should be covered AFAICS
private class BaosSubscriber implements Subscriber<ByteBuffer> {

private Subscription subscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@

package software.amazon.awssdk.core.async;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
Expand All @@ -43,21 +41,21 @@ public final class FileAsyncRequestProvider implements AsyncRequestProvider {
/**
* File to read.
*/
private final File file;
private final Path file;

/**
* Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
*/
private final int chunkSizeInBytes;

private FileAsyncRequestProvider(DefaultBuilder builder) {
this.file = builder.path.toFile();
this.file = builder.path;
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
}

@Override
public long contentLength() {
return file.length();
return file.toFile().length();
}

@Override
Expand Down Expand Up @@ -140,29 +138,41 @@ private static class FileSubscription implements Subscription {
private final int chunkSize;

private long position = 0;
private AtomicLong outstandingRequests = new AtomicLong(0);
private AtomicLong outstandingDemand = new AtomicLong(0);
private boolean writeInProgress = false;

private FileSubscription(File file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
private FileSubscription(Path file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
this.inputChannel = openInputChannel(file);
this.subscriber = subscriber;
this.chunkSize = chunkSize;
}

@Override
public void request(long n) {
try {
outstandingRequests.addAndGet(n);
if (n < 1) {
IllegalArgumentException ex =
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.");
subscriber.onError(ex);
} else
try {
long initialDemand = outstandingDemand.get();
long newDemand = initialDemand + n;
if (newDemand < 1) {
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
outstandingDemand.set(Long.MAX_VALUE);
} else {
outstandingDemand.set(newDemand);
}

synchronized (this) {
if (!writeInProgress) {
writeInProgress = true;
readData();
synchronized (this) {
if (!writeInProgress) {
writeInProgress = true;
readData();
}
}
} catch (Exception e) {
subscriber.onError(e);
}
} catch (Exception e) {
subscriber.onError(e);
}
}

@Override
Expand All @@ -184,7 +194,7 @@ public void completed(Integer result, ByteBuffer attachment) {
position += attachment.remaining();
subscriber.onNext(attachment);
// If we have more permits, queue up another read.
if (outstandingRequests.decrementAndGet() > 0) {
if (outstandingDemand.decrementAndGet() > 0) {
readData();
return;
}
Expand Down Expand Up @@ -217,12 +227,9 @@ private void closeFile() {

}

private static AsynchronousFileChannel openInputChannel(File file) {
private static AsynchronousFileChannel openInputChannel(Path path) {
try {
final Path path = Paths.get(file.getAbsolutePath());
if (!Files.exists(path)) {
Files.createFile(path);
}
if (!Files.exists(path)) Files.createFile(path);
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public ResponseT complete() {
/**
* {@link Subscriber} implementation that writes chunks to a file.
*/
// FIXME cover with Reactive Streams TCK, looks ok from a first brief look, but could be missing some edge cases
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is not (at least yet), intended to be merged, please assume those are just markers for myself to know where/how many implementations are there and what we should cover with tests.

private class FileSubscriber implements Subscriber<ByteBuffer> {

private volatile boolean writeInProgress = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,61 @@
package software.amazon.awssdk.core.async;

import java.nio.ByteBuffer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* Signals a single byte buffer element;
* Can be subscribed to multiple times;
*/
class SingleByteArrayAsyncRequestProvider implements AsyncRequestProvider {

private final byte[] bytes;
private final byte[] bytes;

SingleByteArrayAsyncRequestProvider(byte[] bytes) {
this.bytes = bytes.clone();
}
SingleByteArrayAsyncRequestProvider(byte[] bytes) {
this.bytes = bytes.clone();
}

@Override
public long contentLength() {
return bytes.length;
}
@Override
public long contentLength() {
return bytes.length;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
subscriber.onSubscribe(
new Subscription() {
@Override
public void request(long n) {
if (n > 0) {
subscriber.onNext(ByteBuffer.wrap(bytes));
subscriber.onComplete();
}
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) throw new NullPointerException("Subscription MUST NOT be null.");

@Override
public void cancel() {
// As per 2.13, this method must return normally (i.e. not throw).
try {
s.onSubscribe(
new Subscription() {
boolean done = false;
@Override
public void request(long n) {
if (n > 0) {
if (!done) {
s.onNext(ByteBuffer.wrap(bytes));
done = true;
s.onComplete();
}
} else {
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
}
}

@Override
public void cancel() {
}
);
}
);
} catch (Throwable ex) {
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " +
"by throwing an exception from onSubscribe.", ex)
// When onSubscribe fails this way, we don't know what state the
// s is thus calling onError may cause more crashes.
.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static String userAgent() {

ua = ua
.replace("{platform}", "java")
.replace("{version}", VersionInfo.SDK_VERSION)
.replace("{version}", "MOCK")//VersionInfo.SDK_VERSION)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIXME: had a compile error here, remove in case we'd intend to merge this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you run a mvn generate-sources -pl :core it should generate the VersionInfo class for you and add it to the generated-sources directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.replace("{os.name}", replaceSpaces(JavaSystemSetting.OS_NAME.getStringValue().orElse(null)))
.replace("{os.version}", replaceSpaces(JavaSystemSetting.OS_VERSION.getStringValue().orElse(null)))
.replace("{java.vm.name}", replaceSpaces(JavaSystemSetting.JAVA_VM_NAME.getStringValue().orElse(null)))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.async;

import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.TestEnvironment;
import software.amazon.awssdk.http.async.SimpleSubscriber;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;

public class FileAsyncRequestPublisherTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {

// same as `FileAsyncRequestProvider.DEFAULT_CHUNK_SIZE`:
final int DEFAULT_CHUNK_SIZE = 16 * 1024;
final int ELEMENTS = 1000;

// mock file system:
final FileSystem fs = Jimfs.newFileSystem(Configuration.unix());

final Path testFile;
final Path doestNotExist;

public FileAsyncRequestPublisherTckTest() throws IOException {
super(new TestEnvironment());
testFile = Files.createFile(fs.getPath("/test-file.tmp"));

doestNotExist = new File("does-not-exist").toPath();

final BufferedWriter writer = Files.newBufferedWriter(testFile);

final char[] chars = new char[DEFAULT_CHUNK_SIZE];
Arrays.fill(chars, 'A');

for (int i = 0; i < ELEMENTS; i++) {
writer.write(chars); // write one chunk
}
}

@Override
public Publisher<ByteBuffer> createPublisher(long elements) {
if (elements < ELEMENTS) return AsyncRequestProvider.fromFile(testFile);
else return null; // we don't support more elements
}

@Override
public Publisher<ByteBuffer> createFailedPublisher() {
// tests properly failing on non existing files:
return AsyncRequestProvider.fromFile(doestNotExist);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.awssdk.core.async;

import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.TestEnvironment;
import software.amazon.awssdk.http.async.SimpleSubscriber;

import java.nio.ByteBuffer;

public class SimpleSubscriberTckTest extends org.reactivestreams.tck.SubscriberBlackboxVerification<ByteBuffer> {

public SimpleSubscriberTckTest() {
super(new TestEnvironment());
}

@Override
public Subscriber<ByteBuffer> createSubscriber() {
return new SimpleSubscriber(buffer -> {
// ignore
});
}

@Override
public ByteBuffer createElement(int i) {
return ByteBuffer.wrap(String.valueOf(i).getBytes());
}

}
Loading