Skip to content

Commit 03677b2

Browse files
authored
Fixed the issue where the SDK did not offload future completion (#3034)
* Fixed the issue where the SDK did not offload future completion to the future completion executor * Revert the change to offload metrics reporting and remove false comment * Fix merge error
1 parent e2d8c61 commit 03677b2

File tree

4 files changed

+55
-58
lines changed

4 files changed

+55
-58
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.Executor;
2727
import java.util.concurrent.ScheduledExecutorService;
2828
import java.util.function.Supplier;
29-
import org.reactivestreams.Publisher;
3029
import org.reactivestreams.Subscriber;
3130
import software.amazon.awssdk.annotations.SdkInternalApi;
3231
import software.amazon.awssdk.core.Response;
@@ -48,7 +47,6 @@
4847
import software.amazon.awssdk.core.metrics.CoreMetric;
4948
import software.amazon.awssdk.http.SdkHttpFullRequest;
5049
import software.amazon.awssdk.http.SdkHttpMethod;
51-
import software.amazon.awssdk.http.SdkHttpResponse;
5250
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5351
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
5452
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
@@ -119,59 +117,12 @@ public CompletableFuture<Response<OutputT>> execute(CompletableFuture<SdkHttpFul
119117
return toReturn;
120118
}
121119

122-
private static final class WrappedErrorForwardingResponseHandler<T>
123-
implements TransformingAsyncResponseHandler<T> {
124-
125-
private final TransformingAsyncResponseHandler<T> wrappedHandler;
126-
private final CompletableFuture<T> responseFuture;
127-
128-
private WrappedErrorForwardingResponseHandler(TransformingAsyncResponseHandler<T> wrappedHandler,
129-
CompletableFuture<T> responseFuture) {
130-
this.wrappedHandler = wrappedHandler;
131-
this.responseFuture = responseFuture;
132-
133-
}
134-
135-
private static <T> WrappedErrorForwardingResponseHandler<T> of(
136-
TransformingAsyncResponseHandler<T> wrappedHandler,
137-
CompletableFuture<T> responseFuture) {
138-
139-
return new WrappedErrorForwardingResponseHandler<>(wrappedHandler, responseFuture);
140-
}
141-
142-
@Override
143-
public CompletableFuture<T> prepare() {
144-
return wrappedHandler.prepare();
145-
}
146-
147-
@Override
148-
public void onHeaders(SdkHttpResponse headers) {
149-
wrappedHandler.onHeaders(headers);
150-
}
151-
152-
@Override
153-
public void onStream(Publisher<ByteBuffer> stream) {
154-
wrappedHandler.onStream(stream);
155-
}
156-
157-
@Override
158-
public void onError(Throwable error) {
159-
responseFuture.completeExceptionally(error);
160-
wrappedHandler.onError(error);
161-
}
162-
}
163-
164120
private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullRequest request,
165121
RequestExecutionContext context) {
166122

167123
CompletableFuture<Response<OutputT>> responseFuture = new CompletableFuture<>();
168124

169-
// Wrap the response handler in a layer that will notify the newly created responseFuture when the onError event
170-
// is triggered
171-
TransformingAsyncResponseHandler<Response<OutputT>> wrappedResponseHandler =
172-
WrappedErrorForwardingResponseHandler.of(responseHandler, responseFuture);
173-
174-
CompletableFuture<Response<OutputT>> responseHandlerFuture = wrappedResponseHandler.prepare();
125+
CompletableFuture<Response<OutputT>> responseHandlerFuture = responseHandler.prepare();
175126

176127
SdkHttpContentPublisher requestProvider = context.requestProvider() == null
177128
? new SimpleHttpContentPublisher(request)
@@ -184,7 +135,7 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
184135
AsyncExecuteRequest.Builder executeRequestBuilder = AsyncExecuteRequest.builder()
185136
.request(requestWithContentLength)
186137
.requestContentPublisher(requestProvider)
187-
.responseHandler(wrappedResponseHandler)
138+
.responseHandler(responseHandler)
188139
.fullDuplex(isFullDuplex(context.executionAttributes()))
189140
.metricCollector(httpMetricCollector);
190141
if (context.executionAttributes().getAttribute(SDK_HTTP_EXECUTION_ATTRIBUTES) != null) {
@@ -223,7 +174,6 @@ private CompletableFuture<Void> doExecuteHttpRequest(RequestExecutionContext con
223174
long callStart = System.nanoTime();
224175
CompletableFuture<Void> httpClientFuture = sdkAsyncHttpClient.execute(executeRequest);
225176

226-
// Offload the metrics reporting from this stage onto the future completion executor
227177
CompletableFuture<Void> result = httpClientFuture.whenComplete((r, t) -> {
228178
long duration = System.nanoTime() - callStart;
229179
metricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, Duration.ofNanos(duration));

test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/AsyncResponseThreadingTest.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
2121
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
2222
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
23+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
2324
import static org.mockito.ArgumentMatchers.any;
24-
import static org.mockito.Mockito.atLeast;
2525
import static org.mockito.Mockito.verify;
2626
import static software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR;
2727

28+
import com.github.tomakehurst.wiremock.http.Fault;
2829
import com.github.tomakehurst.wiremock.junit.WireMockRule;
2930
import java.net.URI;
3031
import java.util.concurrent.Executor;
@@ -34,8 +35,11 @@
3435
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
3536
import software.amazon.awssdk.core.ResponseBytes;
3637
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
38+
import software.amazon.awssdk.core.exception.SdkClientException;
39+
import software.amazon.awssdk.core.retry.RetryPolicy;
3740
import software.amazon.awssdk.regions.Region;
3841
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
42+
import software.amazon.awssdk.services.protocolrestjson.model.ProtocolRestJsonException;
3943
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationRequest;
4044
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationResponse;
4145

@@ -63,13 +67,56 @@ public void completionWithNioThreadWorksCorrectly() {
6367
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
6468
AsyncResponseTransformer.toBytes()).join();
6569

66-
// #1 reporting metrics, #2 completing response
67-
verify(mockExecutor, atLeast(1)).execute(any());
70+
verify(mockExecutor).execute(any());
6871

6972
byte[] arrayCopy = response.asByteArray();
7073
assertThat(arrayCopy).containsExactly('t', 'e', 's', 't');
7174
}
7275

76+
@Test
77+
public void connectionError_completionWithNioThreadWorksCorrectly() {
78+
stubFor(post(urlPathEqualTo(STREAMING_OUTPUT_PATH)).willReturn(aResponse().withFault(Fault.CONNECTION_RESET_BY_PEER).withBody("test")));
79+
80+
Executor mockExecutor = Mockito.spy(new SpyableExecutor());
81+
82+
ProtocolRestJsonAsyncClient client =
83+
ProtocolRestJsonAsyncClient.builder()
84+
.region(Region.US_WEST_1)
85+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
86+
.credentialsProvider(() -> AwsBasicCredentials.create("akid", "skid"))
87+
.asyncConfiguration(c -> c.advancedOption(FUTURE_COMPLETION_EXECUTOR, mockExecutor))
88+
.overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none()))
89+
.build();
90+
91+
assertThatThrownBy(() ->
92+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
93+
AsyncResponseTransformer.toBytes()).join())
94+
.hasCauseInstanceOf(SdkClientException.class);
95+
96+
verify(mockExecutor).execute(any());
97+
}
98+
99+
@Test
100+
public void serverError_completionWithNioThreadWorksCorrectly() {
101+
stubFor(post(urlPathEqualTo(STREAMING_OUTPUT_PATH)).willReturn(aResponse().withStatus(500).withBody("test")));
102+
103+
Executor mockExecutor = Mockito.spy(new SpyableExecutor());
104+
105+
ProtocolRestJsonAsyncClient client =
106+
ProtocolRestJsonAsyncClient.builder()
107+
.region(Region.US_WEST_1)
108+
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
109+
.credentialsProvider(() -> AwsBasicCredentials.create("akid", "skid"))
110+
.overrideConfiguration(o -> o.retryPolicy(RetryPolicy.none()))
111+
.asyncConfiguration(c -> c.advancedOption(FUTURE_COMPLETION_EXECUTOR, mockExecutor))
112+
.build();
113+
114+
assertThatThrownBy(() ->
115+
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
116+
AsyncResponseTransformer.toBytes()).join()).hasCauseInstanceOf(ProtocolRestJsonException.class);
117+
verify(mockExecutor).execute(any());
118+
}
119+
73120
private static class SpyableExecutor implements Executor {
74121
@Override
75122
public void execute(Runnable command) {

test/protocol-tests/src/test/java/software/amazon/awssdk/protocol/tests/retry/AsyncAwsJsonRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class AsyncAwsJsonRetryTest {
5151
public void setupClient() {
5252
client = ProtocolJsonRpcAsyncClient.builder()
5353
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create
54-
("akid", "skid")))
54+
("akid", "skid")))
5555
.region(Region.US_EAST_1)
5656
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
5757
.build();
@@ -155,4 +155,4 @@ public void retryPolicyNone_shouldNotRetry() {
155155
assertThatThrownBy(() -> clientWithNoRetry.allTypes(AllTypesRequest.builder().build()).join())
156156
.hasCauseInstanceOf(ProtocolJsonRpcException.class);
157157
}
158-
}
158+
}

test/protocol-tests/src/test/resources/jetty-logging.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515

1616
# Set up logging implementation
1717
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
18-
org.eclipse.jetty.LEVEL=INFO
18+
org.eclipse.jetty.LEVEL=OFF

0 commit comments

Comments
 (0)