Skip to content

Commit 25d650a

Browse files
ktosodagnir
authored andcommitted
SimpleSubscriber is now spec compliant
1 parent 12a7820 commit 25d650a

File tree

4 files changed

+90
-41
lines changed

4 files changed

+90
-41
lines changed

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,12 @@
138138
<version>${reactive-streams.version}</version>
139139
<scope>test</scope>
140140
</dependency>
141+
<dependency>
142+
<groupId>com.google.jimfs</groupId>
143+
<artifactId>jimfs</artifactId>
144+
<version>1.1</version>
145+
<scope>test</scope>
146+
</dependency>
141147
</dependencies>
142148
<build>
143149
<resources>

core/src/main/java/software/amazon/awssdk/core/async/FileAsyncRequestBody.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515

1616
package software.amazon.awssdk.core.async;
1717

18-
import java.io.File;
1918
import java.io.IOException;
2019
import java.io.UncheckedIOException;
2120
import java.nio.ByteBuffer;
2221
import java.nio.channels.AsynchronousFileChannel;
2322
import java.nio.channels.CompletionHandler;
2423
import java.nio.file.Files;
2524
import java.nio.file.Path;
26-
import java.nio.file.Paths;
2725
import java.nio.file.StandardOpenOption;
2826
import java.util.concurrent.atomic.AtomicLong;
2927
import org.reactivestreams.Subscriber;
@@ -45,21 +43,22 @@ final class FileAsyncRequestBody implements AsyncRequestBody {
4543
/**
4644
* File to read.
4745
*/
48-
private final File file;
46+
private final Path file;
4947

5048
/**
5149
* Size (in bytes) of ByteBuffer chunks read from the file and delivered to the subscriber.
5250
*/
5351
private final int chunkSizeInBytes;
5452

53+
5554
private FileAsyncRequestBody(DefaultBuilder builder) {
56-
this.file = builder.path.toFile();
55+
this.file = builder.path;
5756
this.chunkSizeInBytes = builder.chunkSizeInBytes == null ? DEFAULT_CHUNK_SIZE : builder.chunkSizeInBytes;
5857
}
5958

6059
@Override
6160
public long contentLength() {
62-
return file.length();
61+
return file.toFile().length();
6362
}
6463

6564
@Override
@@ -142,29 +141,41 @@ private static class FileSubscription implements Subscription {
142141
private final int chunkSize;
143142

144143
private long position = 0;
145-
private AtomicLong outstandingRequests = new AtomicLong(0);
144+
private AtomicLong outstandingDemand = new AtomicLong(0);
146145
private boolean writeInProgress = false;
147146

148-
private FileSubscription(File file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
147+
private FileSubscription(Path file, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
149148
this.inputChannel = openInputChannel(file);
150149
this.subscriber = subscriber;
151150
this.chunkSize = chunkSize;
152151
}
153152

154153
@Override
155154
public void request(long n) {
156-
try {
157-
outstandingRequests.addAndGet(n);
155+
if (n < 1) {
156+
IllegalArgumentException ex =
157+
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.");
158+
subscriber.onError(ex);
159+
} else
160+
try {
161+
long initialDemand = outstandingDemand.get();
162+
long newDemand = initialDemand + n;
163+
if (newDemand < 1) {
164+
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded"
165+
outstandingDemand.set(Long.MAX_VALUE);
166+
} else {
167+
outstandingDemand.set(newDemand);
168+
}
158169

159-
synchronized (this) {
160-
if (!writeInProgress) {
161-
writeInProgress = true;
162-
readData();
170+
synchronized (this) {
171+
if (!writeInProgress) {
172+
writeInProgress = true;
173+
readData();
174+
}
163175
}
176+
} catch (Exception e) {
177+
subscriber.onError(e);
164178
}
165-
} catch (Exception e) {
166-
subscriber.onError(e);
167-
}
168179
}
169180

170181
@Override
@@ -186,7 +197,7 @@ public void completed(Integer result, ByteBuffer attachment) {
186197
position += attachment.remaining();
187198
subscriber.onNext(attachment);
188199
// If we have more permits, queue up another read.
189-
if (outstandingRequests.decrementAndGet() > 0) {
200+
if (outstandingDemand.decrementAndGet() > 0) {
190201
readData();
191202
return;
192203
}
@@ -219,12 +230,9 @@ private void closeFile() {
219230

220231
}
221232

222-
private static AsynchronousFileChannel openInputChannel(File file) {
233+
private static AsynchronousFileChannel openInputChannel(Path path) {
223234
try {
224-
final Path path = Paths.get(file.getAbsolutePath());
225-
if (!Files.exists(path)) {
226-
Files.createFile(path);
227-
}
235+
if (!Files.exists(path)) Files.createFile(path);
228236
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
229237
} catch (IOException e) {
230238
throw new UncheckedIOException(e);

core/src/test/java/software/amazon/awssdk/core/async/FileAsyncRequestPublisherTckTest.java

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,37 +29,51 @@
2929
import java.nio.charset.StandardCharsets;
3030
import java.nio.file.FileSystem;
3131
import java.nio.file.Files;
32+
import java.nio.file.Path;
3233
import java.nio.file.Paths;
34+
import java.util.Arrays;
35+
import java.util.List;
3336

34-
import static org.assertj.core.api.Assertions.assertThat;
35-
import static org.mockito.Matchers.any;
36-
import static org.mockito.Mockito.*;
37+
import com.google.common.jimfs.Configuration;
38+
import com.google.common.jimfs.Jimfs;
3739

3840
public class FileAsyncRequestPublisherTckTest extends org.reactivestreams.tck.PublisherVerification<ByteBuffer> {
3941

40-
final File example = File.createTempFile("example", ".tmp");
41-
final File fileDoesNotExist = new File(example.getPath() + "-does-not-exist");
42+
// same as `FileAsyncRequestProvider.DEFAULT_CHUNK_SIZE`:
43+
final int DEFAULT_CHUNK_SIZE = 16 * 1024;
44+
final int ELEMENTS = 1000;
45+
46+
// mock file system:
47+
final FileSystem fs = Jimfs.newFileSystem(Configuration.unix());
48+
49+
final Path testFile;
50+
final Path doestNotExist;
4251

4352
public FileAsyncRequestPublisherTckTest() throws IOException {
4453
super(new TestEnvironment());
54+
testFile = Files.createFile(fs.getPath("/test-file.tmp"));
55+
56+
doestNotExist = new File("does-not-exist").toPath();
57+
58+
final BufferedWriter writer = Files.newBufferedWriter(testFile);
59+
60+
final char[] chars = new char[DEFAULT_CHUNK_SIZE];
61+
Arrays.fill(chars, 'A');
4562

46-
BufferedWriter writer = new BufferedWriter(new FileWriter(example));
47-
writer.write("Hello world\n");
48-
writer.write("Hello world\n");
49-
writer.write("Hello world\n");
50-
writer.write("Hello world\n");
51-
writer.write("Hello world\n");
52-
writer.flush();
53-
writer.close();
63+
for (int i = 0; i < ELEMENTS; i++) {
64+
writer.write(chars); // write one chunk
65+
}
5466
}
5567

5668
@Override
57-
public Publisher<ByteBuffer> createPublisher(long l) {
58-
return AsyncRequestProvider.fromFile(example.toPath());
69+
public Publisher<ByteBuffer> createPublisher(long elements) {
70+
if (elements < ELEMENTS) return AsyncRequestProvider.fromFile(testFile);
71+
else return null; // we don't support more elements
5972
}
6073

6174
@Override
6275
public Publisher<ByteBuffer> createFailedPublisher() {
63-
return AsyncRequestProvider.fromFile(fileDoesNotExist.toPath());
76+
// tests properly failing on non existing files:
77+
return AsyncRequestProvider.fromFile(doestNotExist);
6478
}
6579
}

http-client-spi/src/main/java/software/amazon/awssdk/http/async/SimpleSubscriber.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,59 @@
1616
package software.amazon.awssdk.http.async;
1717

1818
import java.nio.ByteBuffer;
19+
import java.util.concurrent.atomic.AtomicReference;
1920
import java.util.function.Consumer;
2021
import org.reactivestreams.Subscriber;
2122
import org.reactivestreams.Subscription;
2223

2324
/**
2425
* Simple subscriber that does no backpressure and doesn't care about errors or completion.
2526
*/
26-
// TODO provided example how to cover it using the TCK, you may want to put those tests to a separate project,
27-
// TODO if you'd prefer to do so
2827
public class SimpleSubscriber implements Subscriber<ByteBuffer> {
2928

3029
private final Consumer<ByteBuffer> consumer;
30+
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
3131

3232
public SimpleSubscriber(Consumer<ByteBuffer> consumer) {
3333
this.consumer = consumer;
3434
}
3535

3636
@Override
3737
public void onSubscribe(Subscription s) {
38-
s.request(Long.MAX_VALUE);
38+
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
39+
if (s == null) throw new NullPointerException("Subscription MUST NOT be null.");
40+
41+
if (subscription.get() == null) {
42+
if (subscription.compareAndSet(null, s)) {
43+
s.request(Long.MAX_VALUE);
44+
} else onSubscribe(s); // lost race, retry (will cancel in the else branch below)
45+
} else {
46+
try {
47+
s.cancel(); // Cancel the additional subscription
48+
} catch (final Throwable t) {
49+
// Subscription.cancel is not allowed to throw an exception, according to rule 3.15
50+
(new IllegalStateException(s + " violated the Reactive Streams rule 3.15 by throwing an exception from cancel.", t))
51+
.printStackTrace(System.err);
52+
}
53+
}
3954
}
4055

4156
@Override
4257
public void onNext(ByteBuffer byteBuffer) {
58+
// Rule 2.13, null arguments must be failed on eagerly
59+
if (byteBuffer == null) throw new NullPointerException("Element passed to onNext MUST NOT be null.");
60+
4361
consumer.accept(byteBuffer);
4462
}
4563

4664
@Override
4765
public void onError(Throwable t) {
66+
if (t == null) throw new NullPointerException("Throwable passed to onError MUST NOT be null.");
67+
// else, ignore
4868
}
4969

5070
@Override
5171
public void onComplete() {
72+
// ignore
5273
}
5374
}

0 commit comments

Comments
 (0)