Skip to content

Commit ac5d23f

Browse files
committed
Switch to handleAsync
whenCompleteAsync is incorrect because if the response handler future completes exceptionally, then the returned future will also be completed with that exception which will incorrectly trigger a synchronous completion for the response future.
1 parent 754d525 commit ac5d23f

File tree

2 files changed

+62
-5
lines changed

2 files changed

+62
-5
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,11 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
159159
// Attempt to offload the completion of the future returned from this
160160
// stage onto the future completion executor
161161
CompletableFuture<Response<OutputT>> asyncComplete =
162-
responseHandlerFuture.whenCompleteAsync((r, t) -> completeResponseFuture(responseFuture, r, t),
163-
futureCompletionExecutor);
162+
responseHandlerFuture.handleAsync((r, t) -> {
163+
completeResponseFuture(responseFuture, r, t);
164+
return null;
165+
},
166+
futureCompletionExecutor);
164167

165168
// It's possible the async execution above fails. If so, log a warning,
166169
// and just complete it synchronously.

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStageTest.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
package software.amazon.awssdk.core.internal.http.pipeline.stages;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.anyLong;
2122
import static org.mockito.ArgumentMatchers.eq;
2223
import static org.mockito.Mockito.doThrow;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
26+
import static org.mockito.Mockito.spy;
2527
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729
import static org.mockito.Mockito.when;
@@ -33,6 +35,7 @@
3335
import java.time.Duration;
3436
import java.util.concurrent.CompletableFuture;
3537
import java.util.concurrent.ExecutorService;
38+
import java.util.concurrent.Executors;
3639
import java.util.concurrent.RejectedExecutionException;
3740
import java.util.concurrent.ScheduledExecutorService;
3841
import java.util.concurrent.ScheduledFuture;
@@ -58,6 +61,8 @@
5861
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
5962
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
6063
import software.amazon.awssdk.metrics.MetricCollector;
64+
import software.amazon.awssdk.utils.CompletableFutureUtils;
65+
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
6166
import utils.ValidSdkObjects;
6267

6368
@RunWith(MockitoJUnitRunner.class)
@@ -157,7 +162,7 @@ public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest
157162
}
158163

159164
@Test
160-
public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
165+
public void execute_handlerFutureCompletedNormally_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
161166
ExecutorService mockExecutor = mock(ExecutorService.class);
162167
doThrow(new RejectedExecutionException("Busy")).when(mockExecutor).execute(any(Runnable.class));
163168

@@ -169,7 +174,8 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl
169174
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();
170175

171176
TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
172-
when(mockHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));
177+
CompletableFuture prepareFuture = new CompletableFuture();
178+
when(mockHandler.prepare()).thenReturn(prepareFuture);
173179

174180
stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);
175181

@@ -179,10 +185,58 @@ public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompl
179185
CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());
180186

181187
long testThreadId = Thread.currentThread().getId();
182-
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)).join();
188+
CompletableFuture afterWhenComplete =
189+
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId));
190+
191+
prepareFuture.complete(null);
192+
193+
afterWhenComplete.join();
194+
183195
verify(mockExecutor).execute(any(Runnable.class));
184196
}
185197

198+
@Test
199+
public void execute_handlerFutureCompletedExceptionally_doesNotAttemptSynchronousComplete() {
200+
String threadNamePrefix = "async-handle-test";
201+
ExecutorService mockExecutor = Executors.newSingleThreadExecutor(
202+
new ThreadFactoryBuilder().threadNamePrefix(threadNamePrefix).build());
203+
204+
SdkClientConfiguration config =
205+
SdkClientConfiguration.builder()
206+
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, mockExecutor)
207+
.option(ASYNC_HTTP_CLIENT, sdkAsyncHttpClient)
208+
.build();
209+
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();
210+
211+
TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
212+
CompletableFuture prepareFuture = spy(new CompletableFuture());
213+
when(mockHandler.prepare()).thenReturn(prepareFuture);
214+
215+
stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);
216+
217+
CompletableFuture<SdkHttpFullRequest> requestFuture = CompletableFuture.completedFuture(
218+
ValidSdkObjects.sdkHttpFullRequest().build());
219+
220+
CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());
221+
222+
try {
223+
CompletableFuture afterHandle =
224+
executeFuture.handle((r, t) -> assertThat(Thread.currentThread().getName()).startsWith(threadNamePrefix));
225+
226+
prepareFuture.completeExceptionally(new RuntimeException("parse error"));
227+
228+
afterHandle.join();
229+
230+
assertThatThrownBy(executeFuture::join)
231+
.hasCauseInstanceOf(RuntimeException.class)
232+
.hasMessageContaining("parse error");
233+
234+
verify(prepareFuture, times(0)).whenComplete(any());
235+
} finally {
236+
mockExecutor.shutdown();
237+
}
238+
}
239+
186240
private HttpClientDependencies clientDependencies(Duration timeout) {
187241
SdkClientConfiguration configuration = SdkClientConfiguration.builder()
188242
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)

0 commit comments

Comments
 (0)