Skip to content

Commit 855bba6

Browse files
authored
Implement ADAPTIVE Retry Mode (#2658)
ADAPTIVE retry mode builds on top of STANDARD retry mode and includes rate limiting of requests when the client is throttled.
1 parent ce4d9c0 commit 855bba6

File tree

16 files changed

+1659
-11
lines changed

16 files changed

+1659
-11
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Implement ADAPTIVE retry mode. ADAPTIVE mode builds on STANDARD retry mode and adds rate limiting of requests when the client is throttled.\n\nAdaptive retry mode dynamically limits the rate of AWS requests to maximize success rate. This may be at the expense of request latency. Adaptive retry mode is not recommended when predictable latency is important.\n\nWarning: Adaptive retry mode assumes that the client is working against a single resource (e.g. one DynamoDB Table or one S3 Bucket). If you use a single client for multiple resources, throttling or outages associated with one resource will result in increased latency and failures when accessing all other resources via the same client. When using adaptive retry mode, we recommend using a single client per resource."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@
1919

2020
import java.io.IOException;
2121
import java.time.Duration;
22+
import java.util.OptionalDouble;
2223
import java.util.concurrent.CompletableFuture;
2324
import java.util.concurrent.ScheduledExecutorService;
2425
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2527
import software.amazon.awssdk.core.Response;
2628
import software.amazon.awssdk.core.async.AsyncRequestBody;
2729
import software.amazon.awssdk.core.client.config.SdkClientOption;
30+
import software.amazon.awssdk.core.exception.SdkClientException;
2831
import software.amazon.awssdk.core.exception.SdkException;
2932
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
3033
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
3134
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
3235
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
3336
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
37+
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
3438
import software.amazon.awssdk.http.SdkHttpFullRequest;
3539
import software.amazon.awssdk.utils.CompletableFutureUtils;
3640

@@ -45,16 +49,30 @@ public final class AsyncRetryableStage<OutputT> implements RequestPipeline<SdkHt
4549
private final RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline;
4650
private final ScheduledExecutorService scheduledExecutor;
4751
private final HttpClientDependencies dependencies;
52+
private final RateLimitingTokenBucket rateLimitingTokenBucket;
4853

4954
public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
5055
HttpClientDependencies dependencies,
5156
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline) {
5257
this.responseHandler = responseHandler;
5358
this.dependencies = dependencies;
5459
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
60+
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
5561
this.requestPipeline = requestPipeline;
5662
}
5763

64+
@SdkTestInternalApi
65+
public AsyncRetryableStage(TransformingAsyncResponseHandler<Response<OutputT>> responseHandler,
66+
HttpClientDependencies dependencies,
67+
RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<OutputT>>> requestPipeline,
68+
RateLimitingTokenBucket rateLimitingTokenBucket) {
69+
this.responseHandler = responseHandler;
70+
this.dependencies = dependencies;
71+
this.scheduledExecutor = dependencies.clientConfiguration().option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE);
72+
this.requestPipeline = requestPipeline;
73+
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
74+
}
75+
5876
@Override
5977
public CompletableFuture<Response<OutputT>> execute(SdkHttpFullRequest request,
6078
RequestExecutionContext context) throws Exception {
@@ -69,7 +87,7 @@ private class RetryingExecutor {
6987
private RetryingExecutor(SdkHttpFullRequest request, RequestExecutionContext context) {
7088
this.originalRequestBody = context.requestProvider();
7189
this.context = context;
72-
this.retryableStageHelper = new RetryableStageHelper(request, context, dependencies);
90+
this.retryableStageHelper = new RetryableStageHelper(request, context, rateLimitingTokenBucket, dependencies);
7391
}
7492

7593
public CompletableFuture<Response<OutputT>> execute() throws Exception {
@@ -95,9 +113,25 @@ public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
95113
}
96114

97115
Duration backoffDelay = retryableStageHelper.getBackoffDelay();
116+
117+
OptionalDouble tokenAcquireTimeSeconds = retryableStageHelper.getSendTokenNonBlocking();
118+
if (!tokenAcquireTimeSeconds.isPresent()) {
119+
String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
120+
+ "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
121+
+ "engaged because of prior throttled requests. The request will not be executed.";
122+
future.completeExceptionally(SdkClientException.create(errorMessage));
123+
return;
124+
}
125+
long tokenAcquireTimeMillis = (long) (tokenAcquireTimeSeconds.getAsDouble() * 1000);
126+
98127
if (!backoffDelay.isZero()) {
99128
retryableStageHelper.logBackingOff(backoffDelay);
100-
scheduledExecutor.schedule(() -> attemptExecute(future), backoffDelay.toMillis(), MILLISECONDS);
129+
}
130+
131+
long totalDelayMillis = backoffDelay.toMillis() + tokenAcquireTimeMillis;
132+
133+
if (totalDelayMillis > 0) {
134+
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
101135
} else {
102136
attemptExecute(future);
103137
}
@@ -134,13 +168,16 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
134168
return;
135169
}
136170

171+
retryableStageHelper.updateClientSendingRateForSuccessResponse();
172+
137173
retryableStageHelper.attemptSucceeded();
138174
future.complete(response);
139175
});
140176
}
141177

142178
private void maybeRetryExecute(CompletableFuture<Response<OutputT>> future, Throwable exception) {
143179
retryableStageHelper.setLastException(exception);
180+
retryableStageHelper.updateClientSendingRateForErrorResponse();
144181
maybeAttemptExecute(future);
145182
}
146183
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/RetryableStage.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import java.time.Duration;
2020
import java.util.concurrent.TimeUnit;
2121
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2223
import software.amazon.awssdk.core.Response;
2324
import software.amazon.awssdk.core.exception.SdkException;
2425
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
2526
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
2627
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
2728
import software.amazon.awssdk.core.internal.http.pipeline.RequestToResponsePipeline;
2829
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
30+
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
2931
import software.amazon.awssdk.http.SdkHttpFullRequest;
3032

3133
/**
@@ -35,15 +37,27 @@
3537
public final class RetryableStage<OutputT> implements RequestToResponsePipeline<OutputT> {
3638
private final RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline;
3739
private final HttpClientDependencies dependencies;
40+
private final RateLimitingTokenBucket rateLimitingTokenBucket;
3841

3942
public RetryableStage(HttpClientDependencies dependencies,
4043
RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline) {
4144
this.dependencies = dependencies;
4245
this.requestPipeline = requestPipeline;
46+
this.rateLimitingTokenBucket = new RateLimitingTokenBucket();
47+
}
48+
49+
@SdkTestInternalApi
50+
public RetryableStage(HttpClientDependencies dependencies,
51+
RequestPipeline<SdkHttpFullRequest, Response<OutputT>> requestPipeline,
52+
RateLimitingTokenBucket rateLimitingTokenBucket) {
53+
this.dependencies = dependencies;
54+
this.requestPipeline = requestPipeline;
55+
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
4356
}
4457

4558
public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionContext context) throws Exception {
46-
RetryableStageHelper retryableStageHelper = new RetryableStageHelper(request, context, dependencies);
59+
RetryableStageHelper retryableStageHelper = new RetryableStageHelper(request, context, rateLimitingTokenBucket,
60+
dependencies);
4761

4862
while (true) {
4963
retryableStageHelper.startingAttempt();
@@ -52,6 +66,8 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
5266
throw retryableStageHelper.retryPolicyDisallowedRetryException();
5367
}
5468

69+
retryableStageHelper.getSendToken();
70+
5571
Duration backoffDelay = retryableStageHelper.getBackoffDelay();
5672
if (!backoffDelay.isZero()) {
5773
retryableStageHelper.logBackingOff(backoffDelay);
@@ -64,6 +80,7 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
6480
response = requestPipeline.execute(retryableStageHelper.requestToSend(), context);
6581
} catch (SdkException | IOException e) {
6682
retryableStageHelper.setLastException(e);
83+
retryableStageHelper.updateClientSendingRateForErrorResponse();
6784
continue;
6885
}
6986

@@ -72,9 +89,12 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
7289
if (!response.isSuccess()) {
7390
retryableStageHelper.adjustClockIfClockSkew(response);
7491
retryableStageHelper.setLastException(response.exception());
92+
retryableStageHelper.updateClientSendingRateForErrorResponse();
7593
continue;
7694
}
7795

96+
retryableStageHelper.updateClientSendingRateForSuccessResponse();
97+
7898
retryableStageHelper.attemptSucceeded();
7999
return response;
80100
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/utils/RetryableStageHelper.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.core.internal.http.pipeline.stages.utils;
1717

1818
import java.time.Duration;
19+
import java.util.OptionalDouble;
1920
import java.util.concurrent.CompletionException;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
2122
import software.amazon.awssdk.core.Response;
@@ -31,7 +32,9 @@
3132
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
3233
import software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage;
3334
import software.amazon.awssdk.core.internal.retry.ClockSkewAdjuster;
35+
import software.amazon.awssdk.core.internal.retry.RateLimitingTokenBucket;
3436
import software.amazon.awssdk.core.metrics.CoreMetric;
37+
import software.amazon.awssdk.core.retry.RetryMode;
3538
import software.amazon.awssdk.core.retry.RetryPolicy;
3639
import software.amazon.awssdk.core.retry.RetryPolicyContext;
3740
import software.amazon.awssdk.core.retry.RetryUtils;
@@ -52,6 +55,7 @@ public class RetryableStageHelper {
5255
private final SdkHttpFullRequest request;
5356
private final RequestExecutionContext context;
5457
private final RetryPolicy retryPolicy;
58+
private final RateLimitingTokenBucket rateLimitingTokenBucket;
5559
private final HttpClientDependencies dependencies;
5660

5761
private int attemptNumber = 0;
@@ -60,10 +64,12 @@ public class RetryableStageHelper {
6064

6165
public RetryableStageHelper(SdkHttpFullRequest request,
6266
RequestExecutionContext context,
67+
RateLimitingTokenBucket rateLimitingTokenBucket,
6368
HttpClientDependencies dependencies) {
6469
this.request = request;
6570
this.context = context;
6671
this.retryPolicy = dependencies.clientConfiguration().option(SdkClientOption.RETRY_POLICY);
72+
this.rateLimitingTokenBucket = rateLimitingTokenBucket;
6773
this.dependencies = dependencies;
6874
}
6975

@@ -207,6 +213,86 @@ public void setLastResponse(SdkHttpResponse lastResponse) {
207213
this.lastResponse = lastResponse;
208214
}
209215

216+
/**
217+
* Whether rate limiting is enabled. Only {@link RetryMode#ADAPTIVE} enables rate limiting.
218+
*/
219+
public boolean isRateLimitingEnabled() {
220+
return retryPolicy.retryMode() == RetryMode.ADAPTIVE;
221+
}
222+
223+
/**
224+
* Whether rate limiting should fast fail.
225+
*/
226+
public boolean isFastFailRateLimiting() {
227+
return Boolean.TRUE.equals(retryPolicy.isFastFailRateLimiting());
228+
}
229+
230+
public boolean isLastExceptionThrottlingException() {
231+
if (lastException == null) {
232+
return false;
233+
}
234+
235+
return RetryUtils.isThrottlingException(lastException);
236+
}
237+
238+
/**
239+
* Acquire a send token from the rate limiter. Returns immediately if rate limiting is not enabled.
240+
*/
241+
public void getSendToken() {
242+
if (!isRateLimitingEnabled()) {
243+
return;
244+
}
245+
246+
boolean acquired = rateLimitingTokenBucket.acquire(1.0, isFastFailRateLimiting());
247+
248+
if (!acquired) {
249+
String errorMessage = "Unable to acquire a send token immediately without waiting. This indicates that ADAPTIVE "
250+
+ "retry mode is enabled, fast fail rate limiting is enabled, and that rate limiting is "
251+
+ "engaged because of prior throttled requests. The request will not be executed.";
252+
throw SdkClientException.create(errorMessage);
253+
}
254+
}
255+
256+
/**
257+
* Acquire a send token from the rate limiter in a non blocking manner. See
258+
* {@link RateLimitingTokenBucket#acquireNonBlocking(double, boolean)} for documentation on how to interpret the returned
259+
* value.
260+
*/
261+
public OptionalDouble getSendTokenNonBlocking() {
262+
if (!isRateLimitingEnabled()) {
263+
return OptionalDouble.of(0.0);
264+
}
265+
266+
return rateLimitingTokenBucket.acquireNonBlocking(1.0, isFastFailRateLimiting());
267+
}
268+
269+
/**
270+
* Conditionally updates the sending rate of the rate limiter when an error response is received. This operation is a noop
271+
* if rate limiting is not enabled.
272+
*/
273+
public void updateClientSendingRateForErrorResponse() {
274+
if (!isRateLimitingEnabled()) {
275+
return;
276+
}
277+
// Only throttling errors affect the sending rate. For non error
278+
// responses, they're handled by
279+
// updateClientSendingRateForSuccessResponse()
280+
if (isLastExceptionThrottlingException()) {
281+
rateLimitingTokenBucket.updateClientSendingRate(true);
282+
}
283+
}
284+
285+
/**
286+
* Conditionally updates the sending rate of the rate limiter when an error response is received. This operation is a noop
287+
* if rate limiting is not enabled.
288+
*/
289+
public void updateClientSendingRateForSuccessResponse() {
290+
if (!isRateLimitingEnabled()) {
291+
return;
292+
}
293+
rateLimitingTokenBucket.updateClientSendingRate(false);
294+
}
295+
210296
private boolean isInitialAttempt() {
211297
return attemptNumber == 1;
212298
}

0 commit comments

Comments
 (0)