Skip to content

Fix leaked futures when executor rejects Runnable #4009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-abc1eac.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "This update fixes an issue where CompletableFutures are leaked/never completed when the submission to the FUTURE_COMPLETE_EXECUTOR is rejected.\n\nBy default, the SDK uses `2 * number of cores` (with a maximum of 64), and uses bounded queue of size 1000. In cases where the throughput to the client exceeds the executor's ability to keep up, it would reject executions. Before this change this would lead to leaked futures."
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,26 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
}
});

// Offload the completion of the future returned from this stage onto
// the future completion executor
responseHandlerFuture.whenCompleteAsync((r, t) -> {
if (t == null) {
responseFuture.complete(r);
} else {
responseFuture.completeExceptionally(t);
// Attempt to offload the completion of the future returned from this
// stage onto the future completion executor
CompletableFuture<Response<OutputT>> asyncComplete =
responseHandlerFuture.whenCompleteAsync((r, t) -> completeResponseFuture(responseFuture, r, t),
futureCompletionExecutor);

// It's possible the async execution above fails. If so, log a warning,
// and just complete it synchronously.
asyncComplete.whenComplete((ignored, asyncCompleteError) -> {
if (asyncCompleteError != null) {
log.debug(() -> String.format("Could not complete the service call future on the provided "
+ "FUTURE_COMPLETION_EXECUTOR. The future will be completed synchronously by thread"
+ " %s. This may be an indication that the executor is being overwhelmed by too"
+ " many requests, and it may negatively affect performance. Consider changing "
+ "the configuration of the executor to accommodate the load through the client.",
Thread.currentThread().getName()),
asyncCompleteError);
responseHandlerFuture.whenComplete((r, t) -> completeResponseFuture(responseFuture, r, t));
}
}, futureCompletionExecutor);
});

return responseFuture;
}
Expand Down Expand Up @@ -219,6 +230,14 @@ private TimeoutTracker setupAttemptTimer(CompletableFuture<Response<OutputT>> ex
timeoutMillis);
}

private void completeResponseFuture(CompletableFuture<Response<OutputT>> responseFuture, Response<OutputT> r, Throwable t) {
if (t == null) {
responseFuture.complete(r);
} else {
responseFuture.completeExceptionally(t);
}
}

/**
* When an operation has a streaming input, the customer must supply an {@link AsyncRequestBody} to
* provide the request content in a non-blocking manner. This adapts that interface to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -31,6 +32,8 @@

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -47,6 +50,7 @@
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
import software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils;
import software.amazon.awssdk.http.SdkHttpFullRequest;
Expand Down Expand Up @@ -152,6 +156,33 @@ public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest
}
}

@Test
public void execute_futureCompletionExecutorRejectsWhenCompleteAsync_futureCompletedSynchronously() {
ExecutorService mockExecutor = mock(ExecutorService.class);
doThrow(new RejectedExecutionException("Busy")).when(mockExecutor).execute(any(Runnable.class));

SdkClientConfiguration config =
SdkClientConfiguration.builder()
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, mockExecutor)
.option(ASYNC_HTTP_CLIENT, sdkAsyncHttpClient)
.build();
HttpClientDependencies dependencies = HttpClientDependencies.builder().clientConfiguration(config).build();

TransformingAsyncResponseHandler mockHandler = mock(TransformingAsyncResponseHandler.class);
when(mockHandler.prepare()).thenReturn(CompletableFuture.completedFuture(null));

stage = new MakeAsyncHttpRequestStage<>(mockHandler, dependencies);

CompletableFuture<SdkHttpFullRequest> requestFuture = CompletableFuture.completedFuture(
ValidSdkObjects.sdkHttpFullRequest().build());

CompletableFuture executeFuture = stage.execute(requestFuture, requestContext());

long testThreadId = Thread.currentThread().getId();
executeFuture.whenComplete((r, t) -> assertThat(Thread.currentThread().getId()).isEqualTo(testThreadId)).join();
verify(mockExecutor).execute(any(Runnable.class));
}

private HttpClientDependencies clientDependencies(Duration timeout) {
SdkClientConfiguration configuration = SdkClientConfiguration.builder()
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, Runnable::run)
Expand Down