Skip to content

Commit e10a3f3

Browse files
committed
Fixed the issue where the SDK did not offload future completion to the future completion executor
1 parent e2d8c61 commit e10a3f3

File tree

4 files changed

+61
-58
lines changed

4 files changed

+61
-58
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 4 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.Executor;
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.function.Supplier;
29-
import org.reactivestreams.Publisher;
3029
import org.reactivestreams.Subscriber;
3130
import software.amazon.awssdk.annotations.SdkInternalApi;
3231
import software.amazon.awssdk.core.Response;
@@ -48,7 +47,6 @@
4847
import software.amazon.awssdk.core.metrics.CoreMetric;
4948
import software.amazon.awssdk.http.SdkHttpFullRequest;
5049
import software.amazon.awssdk.http.SdkHttpMethod;
51-
import software.amazon.awssdk.http.SdkHttpResponse;
5250
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5351
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5452
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
@@ -119,59 +117,12 @@ public CompletableFuture<Response<OutputT>> execute(CompletableFuture<SdkHttpFul
119117
return toReturn;
120118
}
121119

122-
private static final class WrappedErrorForwardingResponseHandler<T>
123-
implements TransformingAsyncResponseHandler<T> {
124-
125-
private final TransformingAsyncResponseHandler<T> wrappedHandler;
126-
private final CompletableFuture<T> responseFuture;
127-
128-
private WrappedErrorForwardingResponseHandler(TransformingAsyncResponseHandler<T> wrappedHandler,
129-
CompletableFuture<T> responseFuture) {
130-
this.wrappedHandler = wrappedHandler;
131-
this.responseFuture = responseFuture;
132-
133-
}
134-
135-
private static <T> WrappedErrorForwardingResponseHandler<T> of(
136-
TransformingAsyncResponseHandler<T> wrappedHandler,
137-
CompletableFuture<T> responseFuture) {
138-
139-
return new WrappedErrorForwardingResponseHandler<>(wrappedHandler, responseFuture);
140-
}
141-
142-
@Override
143-
public CompletableFuture<T> prepare() {
144-
return wrappedHandler.prepare();
145-
}
146-
147-
@Override
148-
public void onHeaders(SdkHttpResponse headers) {
149-
wrappedHandler.onHeaders(headers);
150-
}
151-
152-
@Override
153-
public void onStream(Publisher<ByteBuffer> stream) {
154-
wrappedHandler.onStream(stream);
155-
}
156-
157-
@Override
158-
public void onError(Throwable error) {
159-
responseFuture.completeExceptionally(error);
160-
wrappedHandler.onError(error);
161-
}
162-
}
163-
164120
private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullRequest request,
165121
RequestExecutionContext context) {
166122

167123
CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();
168124

169-
// Wrap the response handler in a layer that will notify the newly created responseFuture when the onError event
170-
// is triggered
171-
TransformingAsyncResponseHandler<Response<OutputT>> wrappedResponseHandler =
172-
WrappedErrorForwardingResponseHandler.of(responseHandler, responseFuture);
173-
174-
CompletableFuture<Response<OutputT>> responseHandlerFuture = wrappedResponseHandler.prepare();
125+
CompletableFuture<Response<OutputT>> responseHandlerFuture = responseHandler.prepare();
175126

176127
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
177128
? new SimpleHttpContentPublisher(request)
@@ -184,7 +135,7 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
184135
AsyncExecuteRequest.Builder executeRequestBuilder = AsyncExecuteRequest.builder()
185136
.request(requestWithContentLength)
186137
.requestContentPublisher(requestProvider)
187-
.responseHandler(wrappedResponseHandler)
138+
.responseHandler(responseHandler)
188139
.fullDuplex(isFullDuplex(context.executionAttributes()))
189140
.metricCollector(httpMetricCollector);
190141
if (context.executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES) != null) {
@@ -224,10 +175,10 @@ private CompletableFuture<Void> doExecuteHttpRequest(RequestExecutionContext con
224175
CompletableFuture<Void> httpClientFuture = sdkAsyncHttpClient.execute(executeRequest);
225176

226177
// Offload the metrics reporting from this stage onto the future completion executor
227-
CompletableFuture<Void> result = httpClientFuture.whenComplete((r, t) -> {
178+
CompletableFuture<Void> result = httpClientFuture.whenCompleteAsync((r, t) -> {
228179
long duration = System.nanoTime() - callStart;
229180
metricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, Duration.ofNanos(duration));
230-
});
181+
}, futureCompletionExecutor);
231182

232183
// Make sure failures on the result future are forwarded to the http client future.
233184
CompletableFutureUtils.forwardExceptionTo(result, httpClientFuture);

test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/AsyncResponseThreadingTest.java

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2222
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
2323
import static org.mockito.ArgumentMatchers.any;
24-
import static org.mockito.Mockito.atLeast;
24+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
25+
import static org.mockito.Matchers.any;
26+
import static org.mockito.Mockito.times;
2527
import static org.mockito.Mockito.verify;
2628
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;
2729

30+
import com.github.tomakehurst.wiremock.http.Fault;
2831
import com.github.tomakehurst.wiremock.junit.WireMockRule;
2932
import java.net.URI;
3033
import java.util.concurrent.Executor;
@@ -34,8 +37,11 @@
3437
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
3538
import software.amazon.awssdk.core.ResponseBytes;
3639
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
40+
import software.amazon.awssdk.core.exception.SdkClientException;
41+
import software.amazon.awssdk.core.retry.RetryPolicy;
3742
import software.amazon.awssdk.regions.Region;
3843
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
44+
import software.amazon.awssdk.services.protocolrestjson.model.ProtocolRestJsonException;
3945
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationRequest;
4046
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationResponse;
4147

@@ -64,12 +70,58 @@ public void completionWithNioThreadWorksCorrectly() {
6470
AsyncResponseTransformer.toBytes()).join();
6571

6672
// #1 reporting metrics, #2 completing response
67-
verify(mockExecutor, atLeast(1)).execute(any());
73+
verify(mockExecutor, times(2)).execute(any());
6874

6975
byte[] arrayCopy = response.asByteArray();
7076
assertThat(arrayCopy).containsExactly('t', 'e', 's', 't');
7177
}
7278

79+
@Test
80+
public void connectionError_completionWithNioThreadWorksCorrectly() {
81+
stubFor(post(urlPathEqualTo(STREAMING_OUTPUT_PATH)).willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER).withBody("test")));
82+
83+
Executor mockExecutor = Mockito.spy(new SpyableExecutor());
84+
85+
ProtocolRestJsonAsyncClient client =
86+
ProtocolRestJsonAsyncClient.builder()
87+
.region(Region.US_WEST_1)
88+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
89+
.credentialsProvider(() -> AwsBasicCredentials.create("akid", "skid"))
90+
.asyncConfiguration(c -> c.advancedOption(FUTURE_COMPLETION_EXECUTOR, mockExecutor))
91+
.overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none()))
92+
.build();
93+
94+
assertThatThrownBy(() ->
95+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
96+
AsyncResponseTransformer.toBytes()).join())
97+
.hasCauseInstanceOf(SdkClientException.class);
98+
99+
// #1 reporting metrics, #2 completing response
100+
verify(mockExecutor, times(2)).execute(any());
101+
}
102+
103+
@Test
104+
public void serverError_completionWithNioThreadWorksCorrectly() {
105+
stubFor(post(urlPathEqualTo(STREAMING_OUTPUT_PATH)).willReturn(aResponse().withStatus(500).withBody("test")));
106+
107+
Executor mockExecutor = Mockito.spy(new SpyableExecutor());
108+
109+
ProtocolRestJsonAsyncClient client =
110+
ProtocolRestJsonAsyncClient.builder()
111+
.region(Region.US_WEST_1)
112+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
113+
.credentialsProvider(() -> AwsBasicCredentials.create("akid", "skid"))
114+
.overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none()))
115+
.asyncConfiguration(c -> c.advancedOption(FUTURE_COMPLETION_EXECUTOR, mockExecutor))
116+
.build();
117+
118+
assertThatThrownBy(() ->
119+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
120+
AsyncResponseTransformer.toBytes()).join()).hasCauseInstanceOf(ProtocolRestJsonException.class);
121+
// #1 reporting metrics, #2 completing response
122+
verify(mockExecutor, times(2)).execute(any());
123+
}
124+
73125
private static class SpyableExecutor implements Executor {
74126
@Override
75127
public void execute(Runnable command) {

test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/retry/AsyncAwsJsonRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class AsyncAwsJsonRetryTest {
5151
public void setupClient() {
5252
client = ProtocolJsonRpcAsyncClient.builder()
5353
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create
54-
("akid", "skid")))
54+
("akid", "skid")))
5555
.region(Region.US_EAST_1)
5656
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
5757
.build();
@@ -155,4 +155,4 @@ public void retryPolicyNone_shouldNotRetry() {
155155
assertThatThrownBy(() -> clientWithNoRetry.allTypes(AllTypesRequest.builder().build()).join())
156156
.hasCauseInstanceOf(ProtocolJsonRpcException.class);
157157
}
158-
}
158+
}

test/protocol-tests/src/test/resources/jetty-logging.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515

1616
# Set up logging implementation
1717
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
18-
org.eclipse.jetty.LEVEL=INFO
18+
org.eclipse.jetty.LEVEL=OFF

0 commit comments

Comments
 (0)