Skip to content

Commit 46b6460

Browse files
authored
Avoid throwing NPE in the CombinedResponseAsyncHttpResponseHandler#onStream (#3045)
* Add null check in the CombinedResponseAsyncHttpResponseHandler * Add tests
1 parent 5e5a854 commit 46b6460

File tree

2 files changed

+125
-10
lines changed

2 files changed

+125
-10
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/async/CombinedResponseAsyncHttpResponseHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
import java.nio.ByteBuffer;
2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.atomic.AtomicReference;
2322
import org.reactivestreams.Publisher;
2423
import software.amazon.awssdk.annotations.SdkInternalApi;
2524
import software.amazon.awssdk.core.Response;
2625
import software.amazon.awssdk.core.exception.SdkException;
2726
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
2827
import software.amazon.awssdk.http.SdkHttpFullResponse;
2928
import software.amazon.awssdk.http.SdkHttpResponse;
29+
import software.amazon.awssdk.utils.Validate;
3030

3131
/**
3232
* Detects whether the response succeeded or failed by just checking the HTTP status and delegates to appropriate
@@ -39,7 +39,6 @@ public final class CombinedResponseAsyncHttpResponseHandler<OutputT>
3939
private final TransformingAsyncResponseHandler<OutputT> successResponseHandler;
4040
private final TransformingAsyncResponseHandler<? extends SdkException> errorResponseHandler;
4141
private CompletableFuture<SdkHttpResponse> headersFuture;
42-
private final AtomicReference<SdkHttpFullResponse> response = new AtomicReference<>();
4342

4443
public CombinedResponseAsyncHttpResponseHandler(
4544
TransformingAsyncResponseHandler<OutputT> successResponseHandler,
@@ -51,6 +50,7 @@ public CombinedResponseAsyncHttpResponseHandler(
5150

5251
@Override
5352
public void onHeaders(SdkHttpResponse response) {
53+
Validate.isTrue(headersFuture != null, "onHeaders() invoked without prepare().");
5454
headersFuture.complete(response);
5555
logRequestId(response);
5656

@@ -59,7 +59,6 @@ public void onHeaders(SdkHttpResponse response) {
5959
} else {
6060
errorResponseHandler.onHeaders(response);
6161
}
62-
this.response.set(toFullResponse(response));
6362
}
6463

6564
@Override
@@ -73,7 +72,11 @@ public void onError(Throwable error) {
7372

7473
@Override
7574
public void onStream(Publisher<ByteBuffer> publisher) {
76-
if (this.response.get().isSuccessful()) {
75+
Validate.isTrue(headersFuture != null, "onStream() invoked without prepare().");
76+
Validate.isTrue(headersFuture.isDone(), "headersFuture is still not completed when onStream() is "
77+
+ "invoked.");
78+
SdkHttpResponse sdkHttpResponse = headersFuture.join();
79+
if (sdkHttpResponse.isSuccessful()) {
7780
successResponseHandler.onStream(publisher);
7881
} else {
7982
errorResponseHandler.onStream(publisher);
@@ -82,32 +85,31 @@ public void onStream(Publisher<ByteBuffer> publisher) {
8285

8386
@Override
8487
public CompletableFuture<Response<OutputT>> prepare() {
85-
this.response.set(null);
88+
headersFuture = new CompletableFuture<>();
8689
CompletableFuture<OutputT> preparedTransformFuture = successResponseHandler.prepare();
8790

8891
CompletableFuture<? extends SdkException> preparedErrorTransformFuture = errorResponseHandler == null ? null :
8992
errorResponseHandler.prepare();
9093

91-
headersFuture = new CompletableFuture<>();
92-
9394
return headersFuture.thenCompose(headers -> {
95+
SdkHttpFullResponse sdkHttpFullResponse = toFullResponse(headers);
9496
if (headers.isSuccessful()) {
9597
return preparedTransformFuture.thenApply(
9698
r -> Response.<OutputT>builder().response(r)
97-
.httpResponse(response.get())
99+
.httpResponse(sdkHttpFullResponse)
98100
.isSuccess(true)
99101
.build());
100102
}
101103

102104
if (preparedErrorTransformFuture != null) {
103105
return preparedErrorTransformFuture.thenApply(
104106
e -> Response.<OutputT>builder().exception(e)
105-
.httpResponse(response.get())
107+
.httpResponse(sdkHttpFullResponse)
106108
.isSuccess(false)
107109
.build());
108110
}
109111
return CompletableFuture.completedFuture(
110-
Response.<OutputT>builder().httpResponse(response.get())
112+
Response.<OutputT>builder().httpResponse(sdkHttpFullResponse)
111113
.isSuccess(false)
112114
.build());
113115
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.http.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.when;
22+
23+
import io.reactivex.Flowable;
24+
import java.nio.ByteBuffer;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.concurrent.CompletableFuture;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.mockito.Mockito;
30+
import software.amazon.awssdk.core.Response;
31+
import software.amazon.awssdk.core.exception.SdkClientException;
32+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
33+
import software.amazon.awssdk.http.SdkHttpFullResponse;
34+
35+
class CombinedResponseAsyncHttpResponseHandlerTest {
36+
37+
private CombinedResponseAsyncHttpResponseHandler<Void> responseHandler;
38+
private TransformingAsyncResponseHandler<Void> successResponseHandler;
39+
private TransformingAsyncResponseHandler<SdkClientException> errorResponseHandler;
40+
41+
@BeforeEach
42+
public void setup() {
43+
successResponseHandler = Mockito.mock(TransformingAsyncResponseHandler.class);
44+
errorResponseHandler = Mockito.mock(TransformingAsyncResponseHandler.class);
45+
responseHandler = new CombinedResponseAsyncHttpResponseHandler<>(successResponseHandler, errorResponseHandler);
46+
}
47+
48+
@Test
49+
void onStream_invokedWithoutPrepare_shouldThrowException() {
50+
assertThatThrownBy(() ->
51+
responseHandler.onStream(publisher()))
52+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("onStream() invoked");
53+
54+
}
55+
56+
@Test
57+
void onHeaders_invokedWithoutPrepare_shouldThrowException() {
58+
assertThatThrownBy(() ->
59+
responseHandler.onHeaders(SdkHttpFullResponse.builder().build()))
60+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("onHeaders() invoked");
61+
62+
}
63+
64+
@Test
65+
void onStream_invokedWithoutOnHeaders_shouldThrowException() {
66+
when(successResponseHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
67+
68+
responseHandler.prepare();
69+
assertThatThrownBy(() ->
70+
responseHandler.onStream(publisher()))
71+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("headersFuture is still not completed when onStream()");
72+
73+
}
74+
75+
@Test
76+
void successResponse_shouldCompleteHeaderFuture() {
77+
when(successResponseHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
78+
79+
CompletableFuture<Response<Void>> future = responseHandler.prepare();
80+
SdkHttpFullResponse sdkHttpFullResponse = SdkHttpFullResponse.builder()
81+
.statusCode(200)
82+
.build();
83+
Flowable<ByteBuffer> publisher = publisher();
84+
responseHandler.onHeaders(sdkHttpFullResponse);
85+
responseHandler.onStream(publisher);
86+
verify(successResponseHandler).prepare();
87+
verify(successResponseHandler).onStream(publisher);
88+
assertThat(future).isDone();
89+
assertThat(future.join().httpResponse()).isEqualTo(sdkHttpFullResponse);
90+
}
91+
92+
@Test
93+
void errorResponse_shouldCompleteHeaderFuture() {
94+
when(errorResponseHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
95+
96+
CompletableFuture<Response<Void>> future = responseHandler.prepare();
97+
SdkHttpFullResponse sdkHttpFullResponse = SdkHttpFullResponse.builder()
98+
.statusCode(400)
99+
.build();
100+
Flowable<ByteBuffer> publisher = publisher();
101+
responseHandler.onHeaders(sdkHttpFullResponse);
102+
responseHandler.onStream(publisher);
103+
verify(errorResponseHandler).prepare();
104+
verify(errorResponseHandler).onStream(publisher);
105+
assertThat(future).isDone();
106+
assertThat(future.join().httpResponse()).isEqualTo(sdkHttpFullResponse);
107+
}
108+
109+
private static Flowable<ByteBuffer> publisher() {
110+
return Flowable.just(ByteBuffer.wrap("string".getBytes(StandardCharsets.UTF_8)));
111+
}
112+
113+
}

0 commit comments

Comments
 (0)