Skip to content

Implement ADAPTIVE Retry Mode #2658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-35b961d.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "feature",
"description": "Implement ADAPTIVE retry mode. ADAPTIVE mode builds on STANDARD retry mode and adds rate limiting of requests when the client is throttled.\n\nAdaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. This may be at the expense of request latency. Adaptive retry mode is not recommended when predictable latency is important.\n\nWarning: Adaptive retry mode assumes that the client is working against a single resource (e.g. one DynamoDB Table or one S3 Bucket). If you use a single client for multiple resources, throttling or outages associated with one resource will result in increased latency and failures when accessing all other resources via the same client. When using adaptive retry mode, we recommend using a single client per resource."
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@

import java.io.IOException;
import java.time.Duration;
import java.util.OptionalDouble;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.Response;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;

Expand All @@ -45,16 +49,30 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline;
private final ScheduledExecutorService scheduledExecutor;
private final HttpClientDependencies dependencies;
private final RateLimitingTokenBucket rateLimitingTokenBucket;

public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline) {
this.responseHandler = responseHandler;
this.dependencies = dependencies;
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
this.requestPipeline = requestPipeline;
}

@SdkTestInternalApi
public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline,
RateLimitingTokenBucket rateLimitingTokenBucket) {
this.responseHandler = responseHandler;
this.dependencies = dependencies;
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
this.requestPipeline = requestPipeline;
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
}

@Override
public CompletableFuture<Response<OutputT>> execute(SdkHttpFullRequest request,
RequestExecutionContext context) throws Exception {
Expand All @@ -69,7 +87,7 @@ private class RetryingExecutor {
private RetryingExecutor(SdkHttpFullRequest request, RequestExecutionContext context) {
this.originalRequestBody = context.requestProvider();
this.context = context;
this.retryableStageHelper = new RetryableStageHelper(request, context, dependencies);
this.retryableStageHelper = new RetryableStageHelper(request, context, rateLimitingTokenBucket, dependencies);
}

public CompletableFuture<Response<OutputT>> execute() throws Exception {
Expand All @@ -95,9 +113,25 @@ public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
}

Duration backoffDelay = retryableStageHelper.getBackoffDelay();

OptionalDouble tokenAcquireTimeSeconds = retryableStageHelper.getSendTokenNonBlocking();
if (!tokenAcquireTimeSeconds.isPresent()) {
String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
+ "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
+ "engaged because of prior throttled requests. The request will not be executed.";
future.completeExceptionally(SdkClientException.create(errorMessage));
return;
}
long tokenAcquireTimeMillis = (long) (tokenAcquireTimeSeconds.getAsDouble() * 1000);

if (!backoffDelay.isZero()) {
retryableStageHelper.logBackingOff(backoffDelay);
scheduledExecutor.schedule(() -> attemptExecute(future), backoffDelay.toMillis(), MILLISECONDS);
}

long totalDelayMillis = backoffDelay.toMillis() + tokenAcquireTimeMillis;

if (totalDelayMillis > 0) {
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
} else {
attemptExecute(future);
}
Expand Down Expand Up @@ -134,13 +168,16 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
return;
}

retryableStageHelper.updateClientSendingRateForSuccessResponse();

retryableStageHelper.attemptSucceeded();
future.complete(response);
});
}

private void maybeRetryExecute(CompletableFuture<Response<OutputT>> future, Throwable exception) {
retryableStageHelper.setLastException(exception);
retryableStageHelper.updateClientSendingRateForErrorResponse();
maybeAttemptExecute(future);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.core.Response;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
import software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
import software.amazon.awssdk.http.SdkHttpFullRequest;

/**
Expand All @@ -35,15 +37,27 @@
public final class RetryableStage<OutputT> implements RequestToResponsePipeline<OutputT> {
private final RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline;
private final HttpClientDependencies dependencies;
private final RateLimitingTokenBucket rateLimitingTokenBucket;

public RetryableStage(HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline) {
this.dependencies = dependencies;
this.requestPipeline = requestPipeline;
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
}

@SdkTestInternalApi
public RetryableStage(HttpClientDependencies dependencies,
RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline,
RateLimitingTokenBucket rateLimitingTokenBucket) {
this.dependencies = dependencies;
this.requestPipeline = requestPipeline;
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
}

public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
RetryableStageHelper retryableStageHelper = new RetryableStageHelper(request, context, dependencies);
RetryableStageHelper retryableStageHelper = new RetryableStageHelper(request, context, rateLimitingTokenBucket,
dependencies);

while (true) {
retryableStageHelper.startingAttempt();
Expand All @@ -52,6 +66,8 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
throw retryableStageHelper.retryPolicyDisallowedRetryException();
}

retryableStageHelper.getSendToken();

Duration backoffDelay = retryableStageHelper.getBackoffDelay();
if (!backoffDelay.isZero()) {
retryableStageHelper.logBackingOff(backoffDelay);
Expand All @@ -64,6 +80,7 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
response = requestPipeline.execute(retryableStageHelper.requestToSend(), context);
} catch (SdkException | IOException e) {
retryableStageHelper.setLastException(e);
retryableStageHelper.updateClientSendingRateForErrorResponse();
continue;
}

Expand All @@ -72,9 +89,12 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
if (!response.isSuccess()) {
retryableStageHelper.adjustClockIfClockSkew(response);
retryableStageHelper.setLastException(response.exception());
retryableStageHelper.updateClientSendingRateForErrorResponse();
continue;
}

retryableStageHelper.updateClientSendingRateForSuccessResponse();

retryableStageHelper.attemptSucceeded();
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.core.internal.http.pipeline.stages.utils;

import java.time.Duration;
import java.util.OptionalDouble;
import java.util.concurrent.CompletionException;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.Response;
Expand All @@ -31,7 +32,9 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage;
import software.amazon.awssdk.core.internal.retry.ClockSkewAdjuster;
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
import software.amazon.awssdk.core.metrics.CoreMetric;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.RetryPolicyContext;
import software.amazon.awssdk.core.retry.RetryUtils;
Expand All @@ -52,6 +55,7 @@ public class RetryableStageHelper {
private final SdkHttpFullRequest request;
private final RequestExecutionContext context;
private final RetryPolicy retryPolicy;
private final RateLimitingTokenBucket rateLimitingTokenBucket;
private final HttpClientDependencies dependencies;

private int attemptNumber = 0;
Expand All @@ -60,10 +64,12 @@ public class RetryableStageHelper {

public RetryableStageHelper(SdkHttpFullRequest request,
RequestExecutionContext context,
RateLimitingTokenBucket rateLimitingTokenBucket,
HttpClientDependencies dependencies) {
this.request = request;
this.context = context;
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
this.dependencies = dependencies;
}

Expand Down Expand Up @@ -207,6 +213,86 @@ public void setLastResponse(SdkHttpResponse lastResponse) {
this.lastResponse = lastResponse;
}

/**
* Whether rate limiting is enabled. Only {@link RetryMode#ADAPTIVE} enables rate limiting.
*/
public boolean isRateLimitingEnabled() {
return retryPolicy.retryMode() == RetryMode.ADAPTIVE;
}

/**
* Whether rate limiting should fast fail.
*/
public boolean isFastFailRateLimiting() {
return Boolean.TRUE.equals(retryPolicy.isFastFailRateLimiting());
}

public boolean isLastExceptionThrottlingException() {
if (lastException == null) {
return false;
}

return RetryUtils.isThrottlingException(lastException);
}

/**
* Acquire a send token from the rate limiter. Returns immediately if rate limiting is not enabled.
*/
public void getSendToken() {
if (!isRateLimitingEnabled()) {
return;
}

boolean acquired = rateLimitingTokenBucket.acquire(1.0, isFastFailRateLimiting());

if (!acquired) {
String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
+ "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
+ "engaged because of prior throttled requests. The request will not be executed.";
throw SdkClientException.create(errorMessage);
}
}

/**
* Acquire a send token from the rate limiter in a non blocking manner. See
* {@link RateLimitingTokenBucket#acquireNonBlocking(double, boolean)} for documentation on how to interpret the returned
* value.
*/
public OptionalDouble getSendTokenNonBlocking() {
if (!isRateLimitingEnabled()) {
return OptionalDouble.of(0.0);
}

return rateLimitingTokenBucket.acquireNonBlocking(1.0, isFastFailRateLimiting());
}

/**
* Conditionally updates the sending rate of the rate limiter when an error response is received. This operation is a noop
* if rate limiting is not enabled.
*/
public void updateClientSendingRateForErrorResponse() {
if (!isRateLimitingEnabled()) {
return;
}
// Only throttling errors affect the sending rate. For non error
// responses, they're handled by
// updateClientSendingRateForSuccessResponse()
if (isLastExceptionThrottlingException()) {
rateLimitingTokenBucket.updateClientSendingRate(true);
}
}

/**
* Conditionally updates the sending rate of the rate limiter when an error response is received. This operation is a noop
* if rate limiting is not enabled.
*/
public void updateClientSendingRateForSuccessResponse() {
if (!isRateLimitingEnabled()) {
return;
}
rateLimitingTokenBucket.updateClientSendingRate(false);
}

private boolean isInitialAttempt() {
return attemptNumber == 1;
}
Expand Down
Loading