Skip to content

Commit 12aaaae

Browse files
committed
Add TCK tests for reactive streams impls
Fixes #405 Fixes #406
1 parent a735ec8 commit 12aaaae

File tree

24 files changed

+775
-180
lines changed

24 files changed

+775
-180
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"type": "feature",
4+
"description": "Incorporate the [Reactive Streams Technology Compatibility Kit](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck) and ensure current implementations are compliant. [#519](https://github.com/aws/aws-sdk-java-v2/issues/519)"
5+
}

CONTRIBUTING.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ checklist below:
128128
Commit the new file created by the script in `.changes/next-release` with
129129
your changes.
130130

131+
__Note__: Some changes have additional requirements. Refer to the section below
132+
to see if your change will require additional work to be accepted.
133+
134+
#### Additional Pull Request Requirements
135+
##### Reactive Streams
136+
If the change includes implementations of the [Reactive Streams
137+
interfaces](https://github.com/reactive-streams/reactive-streams-jvm), the
138+
change must also contain verification tests using the [Reactive Streams
139+
Technology Compatibility
140+
Kit](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck)
141+
to ensure specificiation compliance.
142+
131143
### Getting Your Pull Request Merged
132144
All Pull Requests must be approved by at least one member of the SDK team
133145
before it can be merged in. The members only have limited bandwitdth to review

core/pom.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@
118118
<dependency>
119119
<groupId>org.reactivestreams</groupId>
120120
<artifactId>reactive-streams</artifactId>
121-
<version>${reactive-streams.version}</version>
122121
</dependency>
123122

124123
<dependency>
@@ -135,13 +134,11 @@
135134
<dependency>
136135
<groupId>org.reactivestreams</groupId>
137136
<artifactId>reactive-streams-tck</artifactId>
138-
<version>${reactive-streams.version}</version>
139137
<scope>test</scope>
140138
</dependency>
141139
<dependency>
142140
<groupId>com.google.jimfs</groupId>
143141
<artifactId>jimfs</artifactId>
144-
<version>1.1</version>
145142
<scope>test</scope>
146143
</dependency>
147144
</dependencies>

core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.reactivestreams.Publisher;
2222
import org.reactivestreams.Subscription;
2323
import software.amazon.awssdk.core.ResponseBytes;
24+
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
25+
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
2426

2527
/**
2628
* Callback interface to handle a streaming asynchronous response.

core/src/main/java/software/amazon/awssdk/core/async/ByteArrayAsyncRequestBody.java

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.reactivestreams.Subscriber;
2121
import org.reactivestreams.Subscription;
22+
2223
import software.amazon.awssdk.annotations.SdkInternalApi;
2324

2425
/**
@@ -32,51 +33,54 @@
3233
@SdkInternalApi
3334
final class ByteArrayAsyncRequestBody implements AsyncRequestBody {
3435

35-
private final byte[] bytes;
36+
private final byte[] bytes;
3637

3738
ByteArrayAsyncRequestBody(byte[] bytes) {
3839
this.bytes = bytes.clone();
3940
}
4041

41-
@Override
42-
public long contentLength() {
43-
return bytes.length;
44-
}
42+
@Override
43+
public long contentLength() {
44+
return bytes.length;
45+
}
46+
47+
@Override
48+
public void subscribe(Subscriber<? super ByteBuffer> s) {
49+
// As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
50+
if (s == null) {
51+
throw new NullPointerException("Subscription MUST NOT be null.");
52+
}
4553

46-
@Override
47-
public void subscribe(Subscriber<? super ByteBuffer> s) {
48-
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
49-
if (s == null) throw new NullPointerException("Subscription MUST NOT be null.");
54+
// As per 2.13, this method must return normally (i.e. not throw).
55+
try {
56+
s.onSubscribe(
57+
new Subscription() {
58+
boolean done = false;
5059

51-
// As per 2.13, this method must return normally (i.e. not throw).
52-
try {
53-
s.onSubscribe(
54-
new Subscription() {
55-
boolean done = false;
56-
@Override
57-
public void request(long n) {
58-
if (n > 0) {
59-
if (!done) {
60-
s.onNext(ByteBuffer.wrap(bytes));
61-
done = true;
62-
s.onComplete();
63-
}
64-
} else {
65-
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
66-
}
67-
}
60+
@Override
61+
public void request(long n) {
62+
if (n > 0) {
63+
if (!done) {
64+
s.onNext(ByteBuffer.wrap(bytes));
65+
done = true;
66+
s.onComplete();
67+
}
68+
} else {
69+
s.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
70+
}
71+
}
6872

69-
@Override
70-
public void cancel() {
71-
}
72-
}
73-
);
74-
} catch (Throwable ex) {
75-
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " +
76-
"by throwing an exception from onSubscribe.", ex)
77-
// When onSubscribe fails this way, we don't know what state the
78-
// s is thus calling onError may cause more crashes.
79-
.printStackTrace();
73+
@Override
74+
public void cancel() {
75+
}
76+
}
77+
);
78+
} catch (Throwable ex) {
79+
new IllegalStateException(s + " violated the Reactive Streams rule 2.13 " +
80+
"by throwing an exception from onSubscribe.", ex)
81+
// When onSubscribe fails this way, we don't know what state the
82+
// s is thus calling onError may cause more crashes.
83+
.printStackTrace();
84+
}
8085
}
81-
}
8286
}

core/src/main/java/software/amazon/awssdk/core/async/FileAsyncRequestBody.java

Lines changed: 80 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import java.io.IOException;
19-
import java.io.UncheckedIOException;
2019
import java.nio.ByteBuffer;
2120
import java.nio.channels.AsynchronousFileChannel;
2221
import java.nio.channels.CompletionHandler;
23-
import java.nio.file.Files;
2422
import java.nio.file.Path;
2523
import java.nio.file.StandardOpenOption;
2624
import java.util.concurrent.atomic.AtomicLong;
2725
import org.reactivestreams.Subscriber;
2826
import org.reactivestreams.Subscription;
2927
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.core.util.async.NoopSubscription;
3029
import software.amazon.awssdk.utils.builder.SdkBuilder;
3130

3231
/**
@@ -43,7 +42,7 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
4342
/**
4443
* File to read.
4544
*/
46-
private final Path file;
45+
private final Path path;
4746

4847
/**
4948
* Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
@@ -52,18 +51,33 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
5251

5352

5453
private FileAsyncRequestBody(DefaultBuilder builder) {
55-
this.file = builder.path;
54+
this.path = builder.path;
5655
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
5756
}
5857

5958
@Override
6059
public long contentLength() {
61-
return file.toFile().length();
60+
return path.toFile().length();
6261
}
6362

6463
@Override
6564
public void subscribe(Subscriber<? super ByteBuffer> s) {
66-
s.onSubscribe(new FileSubscription(file, s, chunkSizeInBytes));
65+
try {
66+
AsynchronousFileChannel channel = openInputChannel(this.path);
67+
68+
// We need to synchronize here because the subscriber could call
69+
// request() from within onSubscribe which would potentially
70+
// trigger onNext before onSubscribe is finished.
71+
Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes);
72+
synchronized (subscription) {
73+
s.onSubscribe(subscription);
74+
}
75+
} catch (IOException e) {
76+
// subscribe() must return normally, so we need to signal the
77+
// failure to open via onError() once onSubscribe() is signaled.
78+
s.onSubscribe(new NoopSubscription(s));
79+
s.onError(e);
80+
}
6781
}
6882

6983
/**
@@ -135,37 +149,43 @@ public FileAsyncRequestBody build() {
135149
* Reads the file for one subscriber.
136150
*/
137151
private static class FileSubscription implements Subscription {
138-
139152
private final AsynchronousFileChannel inputChannel;
140153
private final Subscriber<? super ByteBuffer> subscriber;
141154
private final int chunkSize;
142155

143156
private long position = 0;
144157
private AtomicLong outstandingDemand = new AtomicLong(0);
145158
private boolean writeInProgress = false;
159+
private volatile boolean done = false;
146160

147-
private FileSubscription(Path file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
148-
this.inputChannel = openInputChannel(file);
161+
private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
162+
this.inputChannel = inputChannel;
149163
this.subscriber = subscriber;
150164
this.chunkSize = chunkSize;
151165
}
152166

153167
@Override
154168
public void request(long n) {
169+
if (done) {
170+
return;
171+
}
172+
155173
if (n < 1) {
156174
IllegalArgumentException ex =
157-
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.");
158-
subscriber.onError(ex);
159-
} else
175+
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a "
176+
+ "non-positive number of elements.");
177+
signalOnError(ex);
178+
} else {
160179
try {
161-
long initialDemand = outstandingDemand.get();
162-
long newDemand = initialDemand + n;
163-
if (newDemand < 1) {
164-
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
165-
outstandingDemand.set(Long.MAX_VALUE);
166-
} else {
167-
outstandingDemand.set(newDemand);
168-
}
180+
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as
181+
// "effectively unbounded"
182+
outstandingDemand.getAndUpdate(initialDemand -> {
183+
if (Long.MAX_VALUE - initialDemand < n) {
184+
return Long.MAX_VALUE;
185+
} else {
186+
return initialDemand + n;
187+
}
188+
});
169189

170190
synchronized (this) {
171191
if (!writeInProgress) {
@@ -174,47 +194,54 @@ public void request(long n) {
174194
}
175195
}
176196
} catch (Exception e) {
177-
subscriber.onError(e);
197+
signalOnError(e);
178198
}
199+
}
179200
}
180201

181202
@Override
182203
public void cancel() {
183-
closeFile();
204+
synchronized (this) {
205+
if (!done) {
206+
done = true;
207+
closeFile();
208+
}
209+
}
184210
}
185211

186212
private void readData() {
187213
// It's possible to have another request for data come in after we've closed the file.
188214
if (!inputChannel.isOpen()) {
189215
return;
190216
}
217+
191218
final ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
192219
inputChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
193220
@Override
194221
public void completed(Integer result, ByteBuffer attachment) {
195222
if (result > 0) {
196223
attachment.flip();
197224
position += attachment.remaining();
198-
subscriber.onNext(attachment);
225+
signalOnNext(attachment);
199226
// If we have more permits, queue up another read.
200227
if (outstandingDemand.decrementAndGet() > 0) {
201228
readData();
202229
return;
203230
}
204231
} else {
205232
// Reached the end of the file, notify the subscriber and cleanup
206-
subscriber.onComplete();
233+
signalOnComplete();
207234
closeFile();
208235
}
209236

210-
synchronized (FileSubscription.this) {
237+
synchronized (this) {
211238
writeInProgress = false;
212239
}
213240
}
214241

215242
@Override
216243
public void failed(Throwable exc, ByteBuffer attachment) {
217-
subscriber.onError(exc);
244+
signalOnError(exc);
218245
closeFile();
219246
}
220247
});
@@ -224,20 +251,38 @@ private void closeFile() {
224251
try {
225252
inputChannel.close();
226253
} catch (IOException e) {
227-
throw new UncheckedIOException(e);
254+
signalOnError(e);
228255
}
229256
}
230257

231-
}
258+
private void signalOnNext(ByteBuffer bb) {
259+
synchronized (this) {
260+
if (!done) {
261+
subscriber.onNext(bb);
262+
}
263+
}
264+
}
232265

233-
private static AsynchronousFileChannel openInputChannel(Path path) {
234-
try {
235-
if (!Files.exists(path)) Files.createFile(path);
236-
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
237-
} catch (IOException e) {
238-
throw new UncheckedIOException(e);
266+
private void signalOnComplete() {
267+
synchronized (this) {
268+
if (!done) {
269+
subscriber.onComplete();
270+
done = true;
271+
}
272+
}
239273
}
240-
}
241274

275+
private void signalOnError(Throwable t) {
276+
synchronized (this) {
277+
if (!done) {
278+
subscriber.onError(t);
279+
done = true;
280+
}
281+
}
282+
}
283+
}
242284

285+
private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
286+
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
287+
}
243288
}

0 commit comments

Comments
 (0)