Skip to content

Commit e64f37e

Browse files
ktosodagnir
authored andcommitted
PR for discussion, showing Reactive Streams TCK, proposing some
solutions
1 parent 0ff976f commit e64f37e

File tree

11 files changed

+199
-6
lines changed

11 files changed

+199
-6
lines changed

core/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,29 @@
115115
<artifactId>utils</artifactId>
116116
<version>${awsjavasdk.version}</version>
117117
</dependency>
118+
<dependency>
119+
<groupId>org.reactivestreams</groupId>
120+
<artifactId>reactive-streams</artifactId>
121+
<version>${reactive-streams.version}</version>
122+
</dependency>
123+
118124
<dependency>
119125
<groupId>org.assertj</groupId>
120126
<artifactId>assertj-core</artifactId>
121127
<scope>test</scope>
122128
</dependency>
129+
<dependency>
130+
<groupId>software.amazon.awssdk</groupId>
131+
<artifactId>test-utils</artifactId>
132+
<version>${awsjavasdk.version}</version>
133+
<scope>test</scope>
134+
</dependency>
135+
<dependency>
136+
<groupId>org.reactivestreams</groupId>
137+
<artifactId>reactive-streams-tck</artifactId>
138+
<version>${reactive-streams.version}</version>
139+
<scope>test</scope>
140+
</dependency>
123141
</dependencies>
124142
<build>
125143
<resources>

core/src/main/java/software/amazon/awssdk/core/async/ByteArrayAsyncRequestBody.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ public long contentLength() {
4444

4545
@Override
4646
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
47+
// FIXME missing protection abiding to rule 1.9, proposal:
48+
// As per rule 1.09, we need to throw a `java.lang.NullPointerException`
49+
// if the `Subscriber` is `null`
50+
// if (subscriber == null) throw null;
51+
52+
// FIXME: onSubscribe is user code, and could be ill behaved, as library we should protect from this,
53+
// FIXME: This is covered by spec rule 2.13; proposal:
54+
// As per 2.13, this method must return normally (i.e. not throw).
55+
// try {
4756
subscriber.onSubscribe(
4857
new Subscription() {
4958
@Override
@@ -53,10 +62,25 @@ public void request(long n) {
5362
subscriber.onComplete();
5463
}
5564
}
65+
// FIXME missing required validation code (rule 1.9):
66+
// "Non-positive requests should be honored with IllegalArgumentException"
67+
// proposal:
68+
// else {
69+
// subscriber.onError(new IllegalArgumentException("§3.9: non-positive requests are not allowed!"));
70+
// }
5671

57-
@Override
58-
public void cancel() {
59-
}
60-
});
72+
@Override
73+
public void cancel() {
74+
}
75+
}
76+
);
77+
// end of implementing 2.13 spec requirement
78+
// } catch (Throwable ex) {
79+
// new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " +
80+
// "by throwing an exception from onSubscribe.", ex)
81+
// // When onSubscribe fails this way, we don't know what state the
82+
// // subscriber is thus calling onError may cause more crashes.
83+
// .printStackTrace();
84+
// }
6185
}
6286
}

core/src/main/java/software/amazon/awssdk/core/async/ByteArrayAsyncResponseTransformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public ResponseBytes<ResponseT> complete() {
6969
/**
7070
* Requests chunks sequentially and dumps them into a {@link ByteArrayOutputStream}.
7171
*/
72+
// TODO cover with Reactive Streams TCK, it's mostly ok, just a few edge cases / sanity checks should be covered AFAICS
7273
private class BaosSubscriber implements Subscriber<ByteBuffer> {
7374

7475
private Subscription subscription;

core/src/main/java/software/amazon/awssdk/core/async/FileAsyncResponseTransformer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public ResponseT complete() {
8282
/**
8383
* {@link Subscriber} implementation that writes chunks to a file.
8484
*/
85+
// FIXME cover with Reactive Streams TCK, looks ok from a first brief look, but could be missing some edge cases
8586
private class FileSubscriber implements Subscriber<ByteBuffer> {
8687

8788
private volatile boolean writeInProgress = false;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.reactivestreams.tck.TestEnvironment;
20+
21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.nio.ByteBuffer;
24+
25+
public class FileAsyncRequestPublisherTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {
26+
27+
final File example = File.createTempFile("example", ".tmp");
28+
final File fileDoesNotExist = new File(example.getPath() + "-does-not-exist");
29+
30+
public FileAsyncRequestPublisherTckTest() throws IOException {
31+
super(new TestEnvironment());
32+
}
33+
34+
@Override
35+
public Publisher<ByteBuffer> createPublisher(long l) {
36+
return AsyncRequestProvider.fromFile(example.toPath());
37+
}
38+
39+
@Override
40+
public Publisher<ByteBuffer> createFailedPublisher() {
41+
return AsyncRequestProvider.fromFile(fileDoesNotExist.toPath());
42+
}
43+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package software.amazon.awssdk.core.async;
16+
17+
import org.reactivestreams.Subscriber;
18+
import org.reactivestreams.tck.TestEnvironment;
19+
import software.amazon.awssdk.http.async.SimpleSubscriber;
20+
21+
import java.nio.ByteBuffer;
22+
23+
public class SimpleSubscriberTckTest extends org.reactivestreams.tck.SubscriberBlackboxVerification<ByteBuffer> {
24+
25+
public SimpleSubscriberTckTest() {
26+
super(new TestEnvironment());
27+
}
28+
29+
@Override
30+
public Subscriber<ByteBuffer> createSubscriber() {
31+
return new SimpleSubscriber(buffer -> {
32+
// ignore
33+
});
34+
}
35+
36+
@Override
37+
public ByteBuffer createElement(int i) {
38+
return ByteBuffer.wrap(String.valueOf(i).getBytes());
39+
}
40+
41+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package software.amazon.awssdk.core.async;
16+
17+
import org.reactivestreams.Publisher;
18+
import org.reactivestreams.tck.TestEnvironment;
19+
20+
import java.nio.ByteBuffer;
21+
22+
public class SingleByteArrayAsyncRequestProviderTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {
23+
24+
public SingleByteArrayAsyncRequestProviderTckTest() {
25+
super(new TestEnvironment());
26+
}
27+
28+
@Override
29+
public long maxElementsFromPublisher() {
30+
return super.maxElementsFromPublisher();
31+
}
32+
33+
@Override
34+
public Publisher<ByteBuffer> createPublisher(long n) {
35+
return AsyncRequestProvider.fromString("Hello world");
36+
}
37+
38+
@Override
39+
public Publisher<ByteBuffer> createFailedPublisher() {
40+
return null;
41+
}
42+
}

http-client-spi/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<dependency>
4242
<groupId>org.reactivestreams</groupId>
4343
<artifactId>reactive-streams</artifactId>
44-
<version>1.0.0.final</version>
44+
<version>${reactive-streams.version}</version>
4545
</dependency>
4646
<dependency>
4747
<groupId>junit</groupId>
@@ -53,6 +53,12 @@
5353
<artifactId>assertj-core</artifactId>
5454
<scope>test</scope>
5555
</dependency>
56+
<dependency>
57+
<groupId>org.reactivestreams</groupId>
58+
<artifactId>reactive-streams-tck</artifactId>
59+
<version>${reactive-streams.version}</version>
60+
<scope>test</scope>
61+
</dependency>
5662
</dependencies>
5763

5864
</project>

http-client-spi/src/main/java/software/amazon/awssdk/http/async/SimpleSubscriber.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
/**
2424
* Simple subscriber that does no backpressure and doesn't care about errors or completion.
2525
*/
26+
// TODO provided example how to cover it using the TCK, you may want to put those tests to a separate project,
27+
// TODO if you'd prefer to do so
2628
public class SimpleSubscriber implements Subscriber<ByteBuffer> {
2729

2830
private final Consumer<ByteBuffer> consumer;

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ private static ByteBuffer copyToByteBuffer(ByteBuf byteBuf) {
170170
return bb;
171171
}
172172

173+
// TODO cover using Reactive Streams TCK
173174
private static class PublisherAdapter implements Publisher<ByteBuffer> {
174175
private final StreamedHttpResponse response;
175176
private final ChannelHandlerContext channelContext;
@@ -220,6 +221,7 @@ public void onComplete() {
220221
}
221222
}
222223

224+
// TODO cover using Reactive Streams TCK (glad you seem to already have looked into the rules! :-))
223225
private static class FullResponseContentPublisher implements Publisher<ByteBuffer> {
224226
private final ChannelHandlerContext channelContext;
225227
private final ByteBuffer fullContent;

pom.xml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,16 @@
103103
<spring.version>3.0.7.RELEASE</spring.version>
104104
<freemarker.version>2.3.9</freemarker.version>
105105
<aspectj.version>1.8.2</aspectj.version>
106-
106+
107107
<jre.version>1.8</jre.version>
108108
<httpcomponents.httpclient.version>4.5.4</httpcomponents.httpclient.version>
109109

110110
<!-- These properties are used by cucumber tests related code -->
111111
<unitils.version>3.3</unitils.version>
112112

113+
<!-- Reactive Streams version -->
114+
<reactive-streams.version>1.0.2</reactive-streams.version>
115+
113116
<!-- Sonar -->
114117
<root.offset>..</root.offset>
115118
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -257,6 +260,16 @@
257260
<artifactId>rxjava</artifactId>
258261
<version>2.1.9</version>
259262
</dependency>
263+
<groupId>org.reactivestreams</groupId>
264+
<artifactId>reactive-streams</artifactId>
265+
<version>${reactive-streams.version}</version>
266+
</dependency>
267+
<dependency>
268+
<groupId>org.reactivestreams</groupId>
269+
<artifactId>reactive-streams-tck</artifactId>
270+
<version>${reactive-streams.version}</version>
271+
<scope>test</scope>
272+
</dependency>
260273
</dependencies>
261274
</dependencyManagement>
262275

0 commit comments

Comments
 (0)