Skip to content

Commit e0d2e83

Browse files
authored
Make calls to endpoint discovery non-blocking for asynchornous clients (#5205)
* Make calls to endpoint discovery non-blocking for asynchornous clients * Remove code style changed * Added more tests and improved code style * Removed changes to existing method code style * Remove unnecessary code * modify get method to use getKey * Add Unit Tests * Remove pom changes * Added more tests * Added Functional Test * Added service dependencies * Removed hacky test. Specified one time runnable integration test in pr description * Added Functional tests * Added timeout to test * Remove Functional tests * Rebased changes * Modified test method to use assertThrownBy
1 parent 57de751 commit e0d2e83

File tree

4 files changed

+166
-41
lines changed

4 files changed

+166
-41
lines changed

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -429,14 +429,14 @@ protected MethodSpec.Builder operationBody(MethodSpec.Builder builder, Operation
429429
AwsClientOption.class)
430430
.addCode(" .resolveIdentity();");
431431

432-
builder.addCode("endpointFuture = identityFuture.thenApply(credentials -> {")
432+
builder.addCode("endpointFuture = identityFuture.thenCompose(credentials -> {")
433433
.addCode(" $1T endpointDiscoveryRequest = $1T.builder()", EndpointDiscoveryRequest.class)
434434
.addCode(" .required($L)", opModel.getInputShape().getEndpointDiscovery().isRequired())
435435
.addCode(" .defaultEndpoint(clientConfiguration.option($T.ENDPOINT))", SdkClientOption.class)
436436
.addCode(" .overrideConfiguration($N.overrideConfiguration().orElse(null))",
437437
opModel.getInput().getVariableName())
438438
.addCode(" .build();")
439-
.addCode(" return endpointDiscoveryCache.get(credentials.accessKeyId(), endpointDiscoveryRequest);")
439+
.addCode(" return endpointDiscoveryCache.getAsync(credentials.accessKeyId(), endpointDiscoveryRequest);")
440440
.addCode("});");
441441

442442
builder.endControlFlow();

codegen/src/test/resources/software/amazon/awssdk/codegen/poet/client/test-endpoint-discovery-async.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,12 @@ public CompletableFuture<TestDiscoveryIdentifiersRequiredResponse> testDiscovery
193193
.overrideConfiguration().flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
194194
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
195195
.resolveIdentity();
196-
endpointFuture = identityFuture.thenApply(credentials -> {
196+
endpointFuture = identityFuture.thenCompose(credentials -> {
197197
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(true)
198198
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
199199
.overrideConfiguration(testDiscoveryIdentifiersRequiredRequest.overrideConfiguration().orElse(null))
200200
.build();
201-
return endpointDiscoveryCache.get(credentials.accessKeyId(), endpointDiscoveryRequest);
201+
return endpointDiscoveryCache.getAsync(credentials.accessKeyId(), endpointDiscoveryRequest);
202202
});
203203
}
204204

@@ -267,11 +267,11 @@ public CompletableFuture<TestDiscoveryOptionalResponse> testDiscoveryOptional(
267267
.overrideConfiguration().flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
268268
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
269269
.resolveIdentity();
270-
endpointFuture = identityFuture.thenApply(credentials -> {
270+
endpointFuture = identityFuture.thenCompose(credentials -> {
271271
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(false)
272272
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
273273
.overrideConfiguration(testDiscoveryOptionalRequest.overrideConfiguration().orElse(null)).build();
274-
return endpointDiscoveryCache.get(credentials.accessKeyId(), endpointDiscoveryRequest);
274+
return endpointDiscoveryCache.getAsync(credentials.accessKeyId(), endpointDiscoveryRequest);
275275
});
276276
}
277277

@@ -348,11 +348,11 @@ public CompletableFuture<TestDiscoveryRequiredResponse> testDiscoveryRequired(
348348
.overrideConfiguration().flatMap(AwsRequestOverrideConfiguration::credentialsIdentityProvider)
349349
.orElseGet(() -> clientConfiguration.option(AwsClientOption.CREDENTIALS_IDENTITY_PROVIDER))
350350
.resolveIdentity();
351-
endpointFuture = identityFuture.thenApply(credentials -> {
351+
endpointFuture = identityFuture.thenCompose(credentials -> {
352352
EndpointDiscoveryRequest endpointDiscoveryRequest = EndpointDiscoveryRequest.builder().required(true)
353353
.defaultEndpoint(clientConfiguration.option(SdkClientOption.ENDPOINT))
354354
.overrideConfiguration(testDiscoveryRequiredRequest.overrideConfiguration().orElse(null)).build();
355-
return endpointDiscoveryCache.get(credentials.accessKeyId(), endpointDiscoveryRequest);
355+
return endpointDiscoveryCache.getAsync(credentials.accessKeyId(), endpointDiscoveryRequest);
356356
});
357357
}
358358

core/sdk-core/src/main/java/software/amazon/awssdk/core/endpointdiscovery/EndpointDiscoveryRefreshCache.java

Lines changed: 66 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -45,46 +45,28 @@ public static EndpointDiscoveryRefreshCache create(EndpointDiscoveryCacheLoader
4545
* @return The endpoint to use for this request
4646
*/
4747
public URI get(String accessKey, EndpointDiscoveryRequest request) {
48-
String key = accessKey;
4948

50-
// Support null (anonymous credentials) by mapping to empty-string. The backing cache does not support null.
51-
if (key == null) {
52-
key = "";
53-
}
49+
String key = getKey(accessKey, request);
50+
EndpointDiscoveryEndpoint endpoint = cache.get(key);
5451

55-
if (request.cacheKey().isPresent()) {
56-
key = key + ":" + request.cacheKey().get();
52+
if (endpoint == null && request.required()) {
53+
return cache.computeIfAbsent(key, k -> getAndJoin(request)).endpoint();
5754
}
55+
return returnCachedOrDefaultEndpoint(key, endpoint, request);
56+
}
5857

58+
public CompletableFuture<URI> getAsync(String accessKey, EndpointDiscoveryRequest request) {
59+
String key = getKey(accessKey, request);
5960
EndpointDiscoveryEndpoint endpoint = cache.get(key);
6061

61-
if (endpoint == null) {
62-
if (request.required()) {
63-
return cache.computeIfAbsent(key, k -> getAndJoin(request)).endpoint();
64-
} else {
65-
EndpointDiscoveryEndpoint tempEndpoint = EndpointDiscoveryEndpoint.builder()
66-
.endpoint(request.defaultEndpoint())
67-
.expirationTime(Instant.now().plusSeconds(60))
68-
.build();
69-
70-
EndpointDiscoveryEndpoint previousValue = cache.putIfAbsent(key, tempEndpoint);
71-
if (previousValue != null) {
72-
// Someone else primed the cache. Use that endpoint (which may be temporary).
73-
return previousValue.endpoint();
74-
} else {
75-
// We primed the cache with the temporary endpoint. Kick off discovery in the background.
76-
refreshCacheAsync(request, key);
77-
}
78-
return tempEndpoint.endpoint();
79-
}
80-
}
81-
82-
if (endpoint.expirationTime().isBefore(Instant.now())) {
83-
cache.put(key, endpoint.toBuilder().expirationTime(Instant.now().plusSeconds(60)).build());
84-
refreshCacheAsync(request, key);
62+
// If a service call needs to be made to discover endpoint
63+
// a completable future for the service call is returned, unblocking I/O
64+
// and then completed asynchronously
65+
if (endpoint == null && request.required()) {
66+
return discoverEndpointHandler(key, request);
8567
}
86-
87-
return endpoint.endpoint();
68+
// In the event of a cache hit, i.e. service call not required, defer to the synchronous code path method.
69+
return CompletableFuture.completedFuture(returnCachedOrDefaultEndpoint(key, endpoint, request));
8870
}
8971

9072
private EndpointDiscoveryEndpoint getAndJoin(EndpointDiscoveryRequest request) {
@@ -109,4 +91,55 @@ public CompletableFuture<EndpointDiscoveryEndpoint> discoverEndpoint(EndpointDis
10991
public void evict(String key) {
11092
cache.remove(key);
11193
}
94+
95+
private String getKey(String accessKey, EndpointDiscoveryRequest request) {
96+
String key = accessKey;
97+
98+
// Support null (anonymous credentials) by mapping to empty-string. The backing cache does not support null.
99+
if (key == null) {
100+
key = "";
101+
}
102+
103+
if (request.cacheKey().isPresent()) {
104+
key = key + ":" + request.cacheKey().get();
105+
}
106+
return key;
107+
}
108+
109+
private CompletableFuture<URI> discoverEndpointHandler(String key, EndpointDiscoveryRequest request) {
110+
return discoverEndpoint(request).handle(
111+
(endpointDiscoveryEndpoint, throwable) -> {
112+
if (throwable != null) {
113+
throw EndpointDiscoveryFailedException.create(throwable.getCause());
114+
}
115+
return cache.computeIfAbsent(
116+
key, k -> endpointDiscoveryEndpoint
117+
).endpoint();
118+
});
119+
}
120+
121+
private URI returnCachedOrDefaultEndpoint(String key, EndpointDiscoveryEndpoint endpoint, EndpointDiscoveryRequest request) {
122+
EndpointDiscoveryEndpoint tempEndpoint = EndpointDiscoveryEndpoint.builder()
123+
.endpoint(request.defaultEndpoint())
124+
.expirationTime(Instant.now().plusSeconds(60))
125+
.build();
126+
127+
if (endpoint == null) {
128+
EndpointDiscoveryEndpoint previousValue = cache.putIfAbsent(key, tempEndpoint);
129+
if (previousValue != null) {
130+
// Someone else primed the cache. Use that endpoint (which may be temporary).
131+
return previousValue.endpoint();
132+
}
133+
// We primed the cache with the temporary endpoint. Kick off discovery in the background.
134+
refreshCacheAsync(request, key);
135+
return tempEndpoint.endpoint();
136+
}
137+
138+
if (endpoint.expirationTime().isBefore(Instant.now())) {
139+
cache.put(key, endpoint.toBuilder().expirationTime(Instant.now().plusSeconds(60)).build());
140+
refreshCacheAsync(request, key);
141+
}
142+
143+
return endpoint.endpoint();
144+
}
112145
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.endpointdiscovery;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
20+
import static org.junit.jupiter.api.Assertions.assertThrows;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import java.net.URI;
26+
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutionException;
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
32+
public class EndpointDiscoveryRefreshCacheTest {
33+
34+
private EndpointDiscoveryRefreshCache endpointDiscoveryRefreshCache;
35+
private EndpointDiscoveryCacheLoader mockClient;
36+
private static final URI testURI = URI.create("test_endpoint");
37+
private static final String requestCacheKey = "request_cache_key";
38+
private static final String accessKey = "access_cache_key";
39+
40+
@BeforeEach
41+
public void setup() {
42+
this.mockClient= mock(EndpointDiscoveryCacheLoader.class);
43+
this.endpointDiscoveryRefreshCache = EndpointDiscoveryRefreshCache.create(mockClient);
44+
}
45+
46+
@Test
47+
public void getAsync_notRequired_returns_CompletedFuture() throws ExecutionException, InterruptedException {
48+
when(mockClient.discoverEndpoint(any())).thenReturn(new CompletableFuture<>());
49+
EndpointDiscoveryRequest request = EndpointDiscoveryRequest.builder()
50+
.required(false)
51+
.defaultEndpoint(testURI)
52+
.build();
53+
assertThat(endpointDiscoveryRefreshCache.getAsync("key", request).isDone()).isEqualTo(true);
54+
assertThat(endpointDiscoveryRefreshCache.getAsync("key", request).get()).isEqualTo(testURI);
55+
56+
}
57+
58+
@Test
59+
public void getAsync_returns_CompletedFuture() throws ExecutionException, InterruptedException {
60+
61+
when(mockClient.discoverEndpoint(any())).thenReturn(new CompletableFuture<>());
62+
EndpointDiscoveryRequest request = EndpointDiscoveryRequest.builder()
63+
.required(true)
64+
.defaultEndpoint(testURI)
65+
.build();
66+
CompletableFuture<URI> future = endpointDiscoveryRefreshCache.getAsync("key", request);
67+
assertThat(future.isDone()).isEqualTo(false);
68+
69+
future.complete(testURI);
70+
71+
assertThat(future.isDone()).isEqualTo(true);
72+
assertThat(future.get()).isEqualTo(testURI);
73+
}
74+
75+
@Test
76+
public void getAsync_future_cancelled() {
77+
78+
when(mockClient.discoverEndpoint(any())).thenReturn(new CompletableFuture<>());
79+
EndpointDiscoveryRequest request = EndpointDiscoveryRequest.builder()
80+
.required(true)
81+
.defaultEndpoint(testURI)
82+
.build();
83+
CompletableFuture<URI> future = endpointDiscoveryRefreshCache.getAsync("key", request);
84+
assertThat(future.isDone()).isEqualTo(false);
85+
86+
future.cancel(true);
87+
assertThat(future.isCancelled()).isEqualTo(true);
88+
assertThatThrownBy(future::get).isInstanceOf(CancellationException.class);
89+
90+
}
91+
92+
}

0 commit comments

Comments
 (0)