Skip to content

Commit 884b543

Browse files
committed
Fix the race condition in the FileAsyncRequestBody which causes the request to hang
1 parent b5df57e commit 884b543

File tree

5 files changed

+68
-87
lines changed

5 files changed

+68
-87
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java

Lines changed: 36 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,11 @@ private static final class FileSubscription implements Subscription {
169169
private final Subscriber<? super ByteBuffer> subscriber;
170170
private final int chunkSize;
171171

172-
private long position = 0;
173-
private AtomicLong outstandingDemand = new AtomicLong(0);
174-
private boolean writeInProgress = false;
172+
private final AtomicLong position = new AtomicLong(0);
173+
private long outstandingDemand = 0;
174+
private boolean readInProgress = false;
175175
private volatile boolean done = false;
176+
private final Object lock = new Object();
176177

177178
private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber<? super ByteBuffer> subscriber, int chunkSize) {
178179
this.inputChannel = inputChannel;
@@ -189,23 +190,24 @@ public void request(long n) {
189190
if (n < 1) {
190191
IllegalArgumentException ex =
191192
new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a "
192-
+ "non-positive number of elements.");
193+
+ "non-positive number of elements.");
193194
signalOnError(ex);
194195
} else {
195196
try {
196-
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as
197-
// "effectively unbounded"
198-
outstandingDemand.getAndUpdate(initialDemand -> {
199-
if (Long.MAX_VALUE - initialDemand < n) {
200-
return Long.MAX_VALUE;
197+
// We need to synchronize here because of the race condition
198+
// where readData finishes reading at the same time request
199+
// demand comes in
200+
synchronized (lock) {
201+
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as
202+
// "effectively unbounded"
203+
if (Long.MAX_VALUE - outstandingDemand < n) {
204+
outstandingDemand = Long.MAX_VALUE;
201205
} else {
202-
return initialDemand + n;
206+
outstandingDemand += n;
203207
}
204-
});
205208

206-
synchronized (this) {
207-
if (!writeInProgress) {
208-
writeInProgress = true;
209+
if (!readInProgress) {
210+
readInProgress = true;
209211
readData();
210212
}
211213
}
@@ -227,32 +229,33 @@ public void cancel() {
227229

228230
private void readData() {
229231
// It's possible to have another request for data come in after we've closed the file.
230-
if (!inputChannel.isOpen()) {
232+
if (!inputChannel.isOpen() || done) {
231233
return;
232234
}
233235

234236
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
235-
inputChannel.read(buffer, position, buffer, new CompletionHandler<Integer, ByteBuffer>() {
237+
inputChannel.read(buffer, position.get(), buffer, new CompletionHandler<Integer, ByteBuffer>() {
236238
@Override
237239
public void completed(Integer result, ByteBuffer attachment) {
240+
238241
if (result > 0) {
239242
attachment.flip();
240-
position += attachment.remaining();
241-
signalOnNext(attachment);
242-
// If we have more permits, queue up another read.
243-
if (outstandingDemand.decrementAndGet() > 0) {
244-
readData();
245-
return;
243+
position.addAndGet(attachment.remaining());
244+
subscriber.onNext(attachment);
245+
246+
synchronized (lock) {
247+
// If we have more permits, queue up another read.
248+
if (--outstandingDemand > 0) {
249+
readData();
250+
} else {
251+
readInProgress = false;
252+
}
246253
}
247254
} else {
248255
// Reached the end of the file, notify the subscriber and cleanup
249256
signalOnComplete();
250257
closeFile();
251258
}
252-
253-
synchronized (FileSubscription.this) {
254-
writeInProgress = false;
255-
}
256259
}
257260

258261
@Override
@@ -271,34 +274,22 @@ private void closeFile() {
271274
}
272275
}
273276

274-
private void signalOnNext(ByteBuffer bb) {
275-
synchronized (this) {
276-
if (!done) {
277-
subscriber.onNext(bb);
278-
}
279-
}
280-
}
281-
282277
private void signalOnComplete() {
283-
synchronized (this) {
284-
if (!done) {
285-
subscriber.onComplete();
286-
done = true;
287-
}
278+
if (!done) {
279+
done = true;
280+
subscriber.onComplete();
288281
}
289282
}
290283

291284
private void signalOnError(Throwable t) {
292-
synchronized (this) {
293-
if (!done) {
294-
subscriber.onError(t);
295-
done = true;
296-
}
285+
if (!done) {
286+
done = true;
287+
subscriber.onError(t);
297288
}
298289
}
299290
}
300291

301292
private static AsynchronousFileChannel openInputChannel(Path path) throws IOException {
302293
return AsynchronousFileChannel.open(path, StandardOpenOption.READ);
303294
}
304-
}
295+
}

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/s3/S3BaseStabilityTest.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.stability.tests.s3;
1717

18+
import static org.assertj.core.api.Assertions.assertThat;
19+
1820
import java.io.File;
1921
import java.io.IOException;
2022
import java.nio.file.Path;
@@ -33,10 +35,13 @@
3335
import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
3436
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
3537
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
38+
import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException;
39+
import software.amazon.awssdk.stability.tests.utils.RetryableTest;
3640
import software.amazon.awssdk.stability.tests.utils.StabilityTestRunner;
3741
import software.amazon.awssdk.testutils.RandomTempFile;
3842
import software.amazon.awssdk.testutils.service.AwsTestBase;
3943
import software.amazon.awssdk.utils.Logger;
44+
import software.amazon.awssdk.utils.Md5Utils;
4045

4146
public abstract class S3BaseStabilityTest extends AwsTestBase {
4247
private static final Logger log = Logger.loggerFor(S3BaseStabilityTest.class);
@@ -60,6 +65,19 @@ public S3BaseStabilityTest(S3AsyncClient testClient) {
6065
this.testClient = testClient;
6166
}
6267

68+
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
69+
public void largeObject_put_get_usingFile() {
70+
String md5Upload = uploadLargeObjectFromFile();
71+
String md5Download = downloadLargeObjectToFile();
72+
assertThat(md5Upload).isEqualTo(md5Download);
73+
}
74+
75+
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
76+
public void putObject_getObject_highConcurrency() {
77+
putObject();
78+
getObject();
79+
}
80+
6381
protected String computeKeyName(int i) {
6482
return "key_" + i;
6583
}
@@ -79,25 +97,35 @@ protected void doGetBucketAcl_lowTpsLongInterval() {
7997
}
8098

8199

82-
protected void downloadLargeObjectToFile() {
100+
protected String downloadLargeObjectToFile() {
83101
File randomTempFile = RandomTempFile.randomUncreatedFile();
84102
StabilityTestRunner.newRunner()
85103
.testName("S3AsyncStabilityTest.downloadLargeObjectToFile")
86104
.futures(testClient.getObject(b -> b.bucket(getTestBucketName()).key(LARGE_KEY_NAME),
87105
AsyncResponseTransformer.toFile(randomTempFile)))
88106
.run();
89-
randomTempFile.delete();
107+
108+
109+
try {
110+
return Md5Utils.md5AsBase64(randomTempFile);
111+
} catch (IOException e) {
112+
throw new RuntimeException(e);
113+
} finally {
114+
randomTempFile.delete();
115+
}
90116
}
91117

92-
protected void uploadLargeObjectFromFile() {
118+
protected String uploadLargeObjectFromFile() {
93119
RandomTempFile file = null;
94120
try {
95121
file = new RandomTempFile((long) 2e+9);
122+
String md5 = Md5Utils.md5AsBase64(file);
96123
StabilityTestRunner.newRunner()
97124
.testName("S3AsyncStabilityTest.uploadLargeObjectFromFile")
98125
.futures(testClient.putObject(b -> b.bucket(getTestBucketName()).key(LARGE_KEY_NAME),
99126
AsyncRequestBody.fromFile(file)))
100127
.run();
128+
return md5;
101129
} catch (IOException e) {
102130
throw new RuntimeException("fail to create test file", e);
103131
} finally {

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/s3/S3CrtAsyncClientStabilityTest.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717

1818
import org.junit.jupiter.api.AfterAll;
1919
import org.junit.jupiter.api.BeforeAll;
20-
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
2120
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
2221
import software.amazon.awssdk.regions.Region;
2322
import software.amazon.awssdk.services.s3.S3AsyncClient;
24-
import software.amazon.awssdk.stability.tests.exceptions.StabilityTestsRetryableException;
25-
import software.amazon.awssdk.stability.tests.utils.RetryableTest;
23+
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
2624

2725
/**
2826
* Stability tests for {@link S3CrtAsyncClient}
@@ -64,16 +62,4 @@ public static void cleanup() {
6462
protected String getTestBucketName() {
6563
return BUCKET_NAME;
6664
}
67-
68-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
69-
public void largeObject_put_get_usingFile() {
70-
uploadLargeObjectFromFile();
71-
downloadLargeObjectToFile();
72-
}
73-
74-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
75-
public void putObject_getObject_highConcurrency() {
76-
putObject();
77-
getObject();
78-
}
7965
}

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/s3/S3NettyAsyncStabilityTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,6 @@ public static void cleanup() {
4545
@Override
4646
protected String getTestBucketName() { return bucketName; }
4747

48-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
49-
public void putObject_getObject_highConcurrency() {
50-
putObject();
51-
getObject();
52-
}
53-
54-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
55-
public void largeObject_put_get_usingFile() {
56-
uploadLargeObjectFromFile();
57-
downloadLargeObjectToFile();
58-
}
59-
6048
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
6149
public void getBucketAcl_lowTpsLongInterval_Netty() {
6250
doGetBucketAcl_lowTpsLongInterval();

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/s3/S3WithCrtAsyncHttpClientStabilityTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,6 @@ public static void cleanup() {
5151
@Override
5252
protected String getTestBucketName() { return bucketName; }
5353

54-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
55-
public void putObject_getObject_highConcurrency() {
56-
putObject();
57-
getObject();
58-
}
59-
60-
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
61-
public void largeObject_put_get_usingFile() {
62-
uploadLargeObjectFromFile();
63-
downloadLargeObjectToFile();
64-
}
65-
6654
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
6755
public void getBucketAcl_lowTpsLongInterval_Crt() {
6856
doGetBucketAcl_lowTpsLongInterval();

0 commit comments

Comments
 (0)