Skip to content

Commit 8b3f54e

Browse files
committed
Fixed an issue where the future gets stuck when upload fails
1 parent 85e532b commit 8b3f54e

File tree

4 files changed

+76
-25
lines changed

4 files changed

+76
-25
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtResponseDataConsumerAdapter.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import java.util.concurrent.CompletableFuture;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
2323
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
24-
import software.amazon.awssdk.core.exception.SdkClientException;
2524
import software.amazon.awssdk.crt.CrtRuntimeException;
2625
import software.amazon.awssdk.crt.http.HttpHeader;
26+
import software.amazon.awssdk.http.SdkHttpResponse;
2727
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
2828
import software.amazon.awssdk.utils.BinaryUtils;
2929
import software.amazon.awssdk.utils.Logger;
@@ -65,16 +65,12 @@ public void onResponseHeaders(int statusCode, HttpHeader[] headers) {
6565

6666
@Override
6767
public void onResponse(GetObjectOutput output) {
68-
69-
if (!headerHandler.sdkHttpResponseFuture().isDone()) {
70-
// Should never happen, but just in case
71-
transformer.exceptionOccurred(SdkClientException.create("Response headers are not ready yet; onResponseHeaders has "
72-
+ "not been invoked"));
73-
return;
74-
}
68+
// Passing empty SdkHttpResponse if it's not available
69+
SdkHttpResponse sdkHttpResponse = headerHandler.sdkHttpResponseFuture()
70+
.getNow(SdkHttpResponse.builder().build());
7571

7672
GetObjectResponse response = S3CrtPojoConversion.fromCrtGetObjectOutput(output,
77-
headerHandler.sdkHttpResponseFuture().join());
73+
sdkHttpResponse);
7874
transformer.onResponse(response);
7975
transformer.onStream(publisher);
8076
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3CrtAsyncClient.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,31 @@ public DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
6969
public <ReturnT> CompletableFuture<ReturnT> getObject(
7070
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ReturnT> asyncResponseTransformer) {
7171

72-
CompletableFuture<ReturnT> future = new CompletableFuture<>();
72+
CompletableFuture<ReturnT> returnFuture = new CompletableFuture<>();
7373
com.amazonaws.s3.model.GetObjectRequest crtGetObjectRequest = S3CrtPojoConversion.toCrtGetObjectRequest(getObjectRequest);
7474
CrtResponseDataConsumerAdapter<ReturnT> adapter = new CrtResponseDataConsumerAdapter<>(asyncResponseTransformer);
7575

7676
CompletableFuture<ReturnT> adapterFuture = adapter.transformerFuture();
7777

7878
CompletableFuture<GetObjectOutput> crtFuture = s3NativeClient.getObject(crtGetObjectRequest, adapter);
79-
CompletableFutureUtils.forwardExceptionTo(future, crtFuture);
79+
80+
// Forward the cancellation to crtFuture to cancel the request
81+
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
82+
83+
// Forward th exception from the CRT returnFuture to the return future in case
84+
// the adapter callback didn't get it
85+
CompletableFutureUtils.forwardExceptionTo(crtFuture, returnFuture);
8086

8187
adapterFuture.whenComplete((r, t) -> {
8288
if (t == null) {
83-
future.complete(r);
89+
returnFuture.complete(r);
8490
} else {
85-
future.completeExceptionally(t);
91+
returnFuture.completeExceptionally(t);
8692
}
87-
// TODO: Offload to future completion thread
93+
// TODO: Offload to returnFuture completion thread
8894
});
8995

90-
return CompletableFutureUtils.forwardExceptionTo(future, adapterFuture);
96+
return CompletableFutureUtils.forwardExceptionTo(returnFuture, adapterFuture);
9197
}
9298

9399
@Override
@@ -101,16 +107,16 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
101107
}
102108

103109
RequestDataSupplierAdapter requestDataSupplier = new RequestDataSupplierAdapter(requestBody);
104-
CompletableFuture<PutObjectOutput> putObjectOutputCompletableFuture = s3NativeClient.putObject(adaptedRequest,
105-
requestDataSupplier);
110+
CompletableFuture<PutObjectOutput> crtFuture = s3NativeClient.putObject(adaptedRequest,
111+
requestDataSupplier);
106112

107113
CompletableFuture<SdkHttpResponse> httpResponseFuture = requestDataSupplier.sdkHttpResponseFuture();
108114
CompletableFuture<PutObjectResponse> executeFuture =
109-
httpResponseFuture.thenCombine(putObjectOutputCompletableFuture,
110-
(header, putObjectOutput) -> S3CrtPojoConversion.fromCrtPutObjectOutput(
111-
putObjectOutput, header));
115+
// If the header is not available, passing empty SDK HTTP response
116+
crtFuture.thenApply(putObjectOutput -> S3CrtPojoConversion.fromCrtPutObjectOutput(
117+
putObjectOutput, httpResponseFuture.getNow(SdkHttpResponse.builder().build())));
112118

113-
return CompletableFutureUtils.forwardExceptionTo(executeFuture, putObjectOutputCompletableFuture);
119+
return CompletableFutureUtils.forwardExceptionTo(executeFuture, crtFuture);
114120
}
115121

116122
@Override

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/CrtResponseDataConsumerAdapterTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.mockito.Mock;
2929
import org.mockito.runners.MockitoJUnitRunner;
3030
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
31-
import software.amazon.awssdk.core.exception.SdkClientException;
3231
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3332

3433
@RunWith(MockitoJUnitRunner.class)
@@ -48,10 +47,12 @@ public void setup() {
4847
}
4948

5049
@Test
51-
public void onResponse_noSdkHttpResponse_shouldCallExceptionOccurred() {
50+
public void onResponse_noSdkHttpResponse_shouldCreateEmptySdkHttpResponse() {
5251
adapter.onResponse(GetObjectOutput.builder().build());
53-
ArgumentCaptor<SdkClientException> captor = ArgumentCaptor.forClass(SdkClientException.class);
54-
verify(transformer).exceptionOccurred(captor.capture());
52+
ArgumentCaptor<GetObjectResponse> captor = ArgumentCaptor.forClass(GetObjectResponse.class);
53+
verify(transformer).onResponse(captor.capture());
54+
assertThat(captor.getValue().responseMetadata().requestId()).isEqualTo("UNKNOWN");
55+
assertThat(captor.getValue().sdkHttpResponse()).isNotNull();
5556
}
5657

5758
@Test

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3CrtAsyncClientTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.transfer.s3.internal;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.verify;
2122
import static org.mockito.Mockito.when;
@@ -85,6 +86,53 @@ public void putObject_cancels_shouldForwardCancellation() {
8586
assertThat(crtFuture).isCancelled();
8687
}
8788

89+
@Test
90+
public void putObject_crtFutureCompletedExceptionally_shouldFail() {
91+
RuntimeException runtimeException = new RuntimeException("test");
92+
CompletableFuture<PutObjectOutput> crtFuture = new CompletableFuture<>();
93+
crtFuture.completeExceptionally(runtimeException);
94+
when(mockS3NativeClient.putObject(any(PutObjectRequest.class),
95+
any(RequestDataSupplier.class)))
96+
.thenReturn(crtFuture);
97+
98+
CompletableFuture<PutObjectResponse> future =
99+
s3CrtAsyncClient.putObject(b -> b.bucket("bucket").key("key"),
100+
AsyncRequestBody.empty());
101+
102+
assertThatThrownBy(() -> future.join()).hasCause(runtimeException);
103+
}
104+
105+
@Test
106+
public void getObject_crtFutureCompletedExceptionally_shouldFail() {
107+
RuntimeException runtimeException = new RuntimeException("test");
108+
CompletableFuture<GetObjectOutput> crtFuture = new CompletableFuture<>();
109+
crtFuture.completeExceptionally(runtimeException);
110+
when(mockS3NativeClient.getObject(any(GetObjectRequest.class),
111+
any(ResponseDataConsumer.class)))
112+
.thenReturn(crtFuture);
113+
114+
CompletableFuture<ResponseBytes<GetObjectResponse>> future =
115+
s3CrtAsyncClient.getObject(b -> b.bucket("bucket").key("key"),
116+
AsyncResponseTransformer.toBytes());
117+
118+
assertThatThrownBy(() -> future.join()).hasCause(runtimeException);
119+
}
120+
121+
@Test
122+
public void putObject_crtFutureCompletedSuccessfully_shouldSucceed() {
123+
CompletableFuture<PutObjectOutput> crtFuture = new CompletableFuture<>();
124+
crtFuture.complete(PutObjectOutput.builder().build());
125+
when(mockS3NativeClient.putObject(any(PutObjectRequest.class),
126+
any(RequestDataSupplier.class)))
127+
.thenReturn(crtFuture);
128+
129+
CompletableFuture<PutObjectResponse> future =
130+
s3CrtAsyncClient.putObject(b -> b.bucket("bucket").key("key"),
131+
AsyncRequestBody.empty());
132+
133+
assertThat(future.join().sdkHttpResponse().statusText()).isEmpty();
134+
}
135+
88136
@Test
89137
public void closeS3Client_shouldCloseUnderlyingResources() {
90138
s3CrtAsyncClient.close();

0 commit comments

Comments
 (0)