Skip to content

Commit 57de751

Browse files
authored
Fixed a bug in file-based AsyncResponseTransformer that could cause a… (#5220)
* Fixed a bug in file-based AsyncResponseTransformer that could cause a streaming request to hang if an exception was thrown from onStream * Fix build * Update changelog entry
1 parent 9313708 commit 57de751

File tree

3 files changed

+37
-8
lines changed

3 files changed

+37
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fixed a bug in file-based AsyncResponseTransformer that could cause a streaming request to hang if an exception was thrown from `onStream`"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption.CREATE_OR_APPEND_TO_EXISTING;
1919
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
20+
import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError;
2021

2122
import java.io.IOException;
2223
import java.nio.ByteBuffer;
@@ -42,6 +43,7 @@
4243
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
4344
import software.amazon.awssdk.core.async.SdkPublisher;
4445
import software.amazon.awssdk.core.exception.SdkClientException;
46+
import software.amazon.awssdk.utils.Logger;
4547

4648
/**
4749
* {@link AsyncResponseTransformer} that writes the data to the specified file.
@@ -50,6 +52,7 @@
5052
*/
5153
@SdkInternalApi
5254
public final class FileAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ResponseT> {
55+
private static final Logger log = Logger.loggerFor(FileAsyncResponseTransformer.class);
5356
private final Path path;
5457
private volatile AsynchronousFileChannel fileChannel;
5558
private volatile CompletableFuture<Void> cf;
@@ -108,7 +111,9 @@ public CompletableFuture<ResponseT> prepare() {
108111
cf = new CompletableFuture<>();
109112
cf.whenComplete((r, t) -> {
110113
if (t != null && fileChannel != null) {
111-
invokeSafely(fileChannel::close);
114+
runAndLogError(log.logger(),
115+
String.format("Failed to close the file %s, resource may be leaked", path),
116+
() -> fileChannel.close());
112117
}
113118
});
114119
return cf.thenApply(ignored -> response);
@@ -121,21 +126,29 @@ public void onResponse(ResponseT response) {
121126

122127
@Override
123128
public void onStream(SdkPublisher<ByteBuffer> publisher) {
124-
// onStream may be called multiple times so reset the file channel every time
125-
this.fileChannel = invokeSafely(() -> createChannel(path));
126-
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf, this::exceptionOccurred,
127-
position));
129+
try {
130+
// onStream may be called multiple times so reset the file channel every time
131+
this.fileChannel = createChannel(path);
132+
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf, this::exceptionOccurred,
133+
position));
134+
} catch (Throwable e) {
135+
exceptionOccurred(e);
136+
}
128137
}
129138

130139
@Override
131140
public void exceptionOccurred(Throwable throwable) {
132141
try {
133142
if (fileChannel != null) {
134-
invokeSafely(fileChannel::close);
143+
runAndLogError(log.logger(),
144+
String.format("Failed to close the file %s, resource may be leaked", path),
145+
() -> fileChannel.close());
135146
}
136147
} finally {
137148
if (configuration.failureBehavior() == FailureBehavior.DELETE) {
138-
invokeSafely(() -> Files.deleteIfExists(path));
149+
runAndLogError(log.logger(),
150+
String.format("Failed to delete the file %s", path),
151+
() -> Files.deleteIfExists(path));
139152
}
140153
}
141154
cf.completeExceptionally(throwable);

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@ void noConfiguration_fileAlreadyExists_shouldThrowException() throws Exception {
125125

126126
CompletableFuture<String> future = transformer.prepare();
127127
transformer.onResponse("foobar");
128-
assertThatThrownBy(() -> transformer.onStream(testPublisher(content))).hasRootCauseInstanceOf(FileAlreadyExistsException.class);
128+
transformer.onStream(testPublisher(content));
129+
assertThatThrownBy(() -> future.join()).hasRootCauseInstanceOf(FileAlreadyExistsException.class);
129130
}
130131

131132
@Test
@@ -246,6 +247,15 @@ void explicitExecutor_shouldUseExecutor() throws Exception {
246247
}
247248
}
248249

250+
@Test
251+
void onStreamFailed_shouldCompleteFutureExceptionally() {
252+
Path testPath = testFs.getPath("test_file.txt");
253+
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath);
254+
CompletableFuture<String> future = transformer.prepare();
255+
transformer.onStream(null);
256+
assertThat(future).isCompletedExceptionally();
257+
}
258+
249259
private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
250260
CompletableFuture<String> future = transformer.prepare();
251261
transformer.onResponse("foobar");

0 commit comments

Comments
 (0)