Skip to content

Commit 03df45f

Browse files
committed
Offload completion of execution future to a separate thread
1 parent 8b3f54e commit 03df45f

File tree

14 files changed

+359
-21
lines changed

14 files changed

+359
-21
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ protected AttributeMap childHttpConfig() {
297297
* Finalize which async executor service will be used for the created client. The default async executor
298298
* service has at least 8 core threads and can scale up to at least 64 threads when needed depending
299299
* on the number of processors available.
300+
*
301+
* This uses the same default executor in S3NativeClientConfiguration#resolveAsyncFutureCompletionExecutor.
302+
* Make sure you update that method if you update the defaults here.
300303
*/
301304
private Executor resolveAsyncFutureCompletionExecutor(SdkClientConfiguration config) {
302305
Supplier<Executor> defaultExecutor = () -> {

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3CrtClientPutObjectIntegrationTest.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.List;
2626
import java.util.Optional;
2727
import java.util.Random;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
2830
import java.util.stream.Collectors;
2931
import java.util.stream.Stream;
3032
import org.assertj.core.api.Assertions;
@@ -37,27 +39,30 @@
3739
import software.amazon.awssdk.core.ResponseBytes;
3840
import software.amazon.awssdk.core.ResponseInputStream;
3941
import software.amazon.awssdk.core.async.AsyncRequestBody;
42+
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
4043
import software.amazon.awssdk.core.sync.ResponseTransformer;
41-
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
42-
import software.amazon.awssdk.transfer.s3.util.ChecksumUtils;
4344
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
4445
import software.amazon.awssdk.testutils.RandomTempFile;
46+
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
47+
import software.amazon.awssdk.transfer.s3.util.ChecksumUtils;
4548

4649
public class S3CrtClientPutObjectIntegrationTest extends S3IntegrationTestBase {
4750
private static final String TEST_BUCKET = temporaryBucketName(S3CrtClientPutObjectIntegrationTest.class);
4851
private static final String TEST_KEY = "8mib_file.dat";
4952
private static final int OBJ_SIZE = 8 * 1024 * 1024;
5053

5154
private static RandomTempFile testFile;
52-
55+
private static ExecutorService executorService;
5356
private S3CrtAsyncClient s3Crt;
5457

58+
5559
@BeforeClass
5660
public static void setup() throws Exception {
5761
S3IntegrationTestBase.setUp();
5862
S3IntegrationTestBase.createBucket(TEST_BUCKET);
5963

6064
testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE);
65+
executorService = Executors.newFixedThreadPool(2);
6166
}
6267

6368
@Before
@@ -77,6 +82,7 @@ public void methodTeardown() {
7782
public static void teardown() throws IOException {
7883
S3IntegrationTestBase.deleteBucketAndAllContents(TEST_BUCKET);
7984
Files.delete(testFile.toPath());
85+
executorService.shutdown();
8086
}
8187

8288
@Test
@@ -147,4 +153,26 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
147153

148154
Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
149155
}
156+
157+
@Test
158+
public void putObject_customExecutorService_objectSentCorrectly() throws IOException {
159+
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
160+
try (S3CrtAsyncClient s3Client =
161+
S3CrtAsyncClient.builder()
162+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
163+
.region(DEFAULT_REGION)
164+
.asyncConfiguration(b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
165+
executorService))
166+
.build()) {
167+
168+
s3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).join();
169+
ResponseInputStream<GetObjectResponse> objContent =
170+
S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
171+
ResponseTransformer.toInputStream());
172+
173+
byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
174+
175+
Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
176+
}
177+
}
150178
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3CrtGetObjectIntegrationTest.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,28 @@
2424
import java.nio.file.Files;
2525
import java.nio.file.Path;
2626
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.Executors;
2729
import org.junit.AfterClass;
2830
import org.junit.BeforeClass;
2931
import org.junit.Test;
3032
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
3133
import software.amazon.awssdk.core.async.SdkPublisher;
32-
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
34+
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
3335
import software.amazon.awssdk.http.async.SimpleSubscriber;
3436
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3537
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3638
import software.amazon.awssdk.testutils.RandomTempFile;
3739
import software.amazon.awssdk.testutils.service.AwsTestBase;
40+
import software.amazon.awssdk.transfer.s3.internal.S3CrtAsyncClient;
3841
import software.amazon.awssdk.utils.Md5Utils;
3942

4043
public class S3CrtGetObjectIntegrationTest extends S3IntegrationTestBase {
4144
private static final String BUCKET = temporaryBucketName(S3CrtGetObjectIntegrationTest.class);
4245
private static final String KEY = "key";
4346
private static S3CrtAsyncClient crtClient;
4447
private static File file;
48+
private static ExecutorService executorService;
4549

4650
@BeforeClass
4751
public static void setup() throws IOException {
@@ -55,11 +59,13 @@ public static void setup() throws IOException {
5559
.bucket(BUCKET)
5660
.key(KEY)
5761
.build(), file.toPath());
62+
executorService = Executors.newFixedThreadPool(2);
5863
}
5964

6065
@AfterClass
6166
public static void cleanup() {
6267
S3IntegrationTestBase.deleteBucketAndAllContents(BUCKET);
68+
executorService.shutdown();
6369
}
6470

6571
@Test
@@ -86,6 +92,24 @@ public void getObject_customResponseTransformer() {
8692

8793
}
8894

95+
@Test
96+
public void getObject_customExecutors_fileDownloadCorrectly() throws IOException {
97+
Path path = RandomTempFile.randomUncreatedFile().toPath();
98+
99+
try (S3CrtAsyncClient s3Client =
100+
S3CrtAsyncClient.builder()
101+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
102+
.region(DEFAULT_REGION)
103+
.asyncConfiguration(b -> b.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
104+
executorService))
105+
.build()) {
106+
GetObjectResponse response =
107+
s3Client.getObject(b -> b.bucket(BUCKET).key(KEY), AsyncResponseTransformer.toFile(path)).join();
108+
109+
assertThat(Md5Utils.md5AsBase64(path.toFile())).isEqualTo(Md5Utils.md5AsBase64(file));
110+
}
111+
}
112+
89113
private static final class TestResponseTransformer implements AsyncResponseTransformer<GetObjectResponse, Void> {
90114
private CompletableFuture<Void> future;
91115

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/S3ClientConfiguration.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
import java.util.Objects;
1919
import java.util.Optional;
20+
import java.util.function.Consumer;
2021
import software.amazon.awssdk.annotations.SdkPreviewApi;
2122
import software.amazon.awssdk.annotations.SdkPublicApi;
2223
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
24+
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
2325
import software.amazon.awssdk.regions.Region;
2426
import software.amazon.awssdk.utils.Validate;
2527
import software.amazon.awssdk.utils.builder.CopyableBuilder;
@@ -39,6 +41,7 @@ public final class S3ClientConfiguration implements ToCopyableBuilder<S3ClientCo
3941
private final Long minimumPartSizeInBytes;
4042
private final Double targetThroughputInGbps;
4143
private final Integer maxConcurrency;
44+
private final ClientAsyncConfiguration asyncConfiguration;
4245

4346
private S3ClientConfiguration(DefaultBuilder builder) {
4447
this.credentialsProvider = builder.credentialsProvider;
@@ -47,6 +50,7 @@ private S3ClientConfiguration(DefaultBuilder builder) {
4750
this.targetThroughputInGbps = Validate.isPositiveOrNull(builder.targetThroughputInGbps, "targetThroughputInGbps");
4851
this.maxConcurrency = Validate.isPositiveOrNull(builder.maxConcurrency,
4952
"maxConcurrency");
53+
this.asyncConfiguration = builder.asyncConfiguration;
5054
}
5155

5256
/**
@@ -84,6 +88,13 @@ public Optional<Integer> maxConcurrency() {
8488
return Optional.ofNullable(maxConcurrency);
8589
}
8690

91+
/**
92+
* @return the optional SDK async configuration specified
93+
*/
94+
public Optional<ClientAsyncConfiguration> asyncConfiguration() {
95+
return Optional.ofNullable(asyncConfiguration);
96+
}
97+
8798
@Override
8899
public Builder toBuilder() {
89100
return new DefaultBuilder(this);
@@ -112,7 +123,10 @@ public boolean equals(Object o) {
112123
if (!Objects.equals(targetThroughputInGbps, that.targetThroughputInGbps)) {
113124
return false;
114125
}
115-
return Objects.equals(maxConcurrency, that.maxConcurrency);
126+
if (!Objects.equals(maxConcurrency, that.maxConcurrency)) {
127+
return false;
128+
}
129+
return Objects.equals(asyncConfiguration, that.asyncConfiguration);
116130
}
117131

118132
@Override
@@ -122,6 +136,7 @@ public int hashCode() {
122136
result = 31 * result + (minimumPartSizeInBytes != null ? minimumPartSizeInBytes.hashCode() : 0);
123137
result = 31 * result + (targetThroughputInGbps != null ? targetThroughputInGbps.hashCode() : 0);
124138
result = 31 * result + (maxConcurrency != null ? maxConcurrency.hashCode() : 0);
139+
result = 31 * result + (asyncConfiguration != null ? asyncConfiguration.hashCode() : 0);
125140
return result;
126141
}
127142

@@ -216,6 +231,28 @@ public interface Builder extends CopyableBuilder<Builder, S3ClientConfiguration>
216231
* @see #targetThroughputInGbps(Double)
217232
*/
218233
Builder maxConcurrency(Integer maxConcurrency);
234+
235+
/**
236+
* Specify overrides to the default SDK async configuration that should be used for clients created by this builder.
237+
*
238+
* @param asyncConfiguration the async configuration
239+
* @return this builder for method chaining.
240+
* @see #asyncConfiguration(Consumer)
241+
*/
242+
Builder asyncConfiguration(ClientAsyncConfiguration asyncConfiguration);
243+
244+
/**
245+
* Similar to {@link #asyncConfiguration(ClientAsyncConfiguration)}, but takes a lambda to configure a new
246+
* {@link ClientAsyncConfiguration.Builder}. This removes the need to call {@link ClientAsyncConfiguration#builder()}
247+
* and {@link ClientAsyncConfiguration.Builder#build()}.
248+
*
249+
* @param configuration the async configuration
250+
* @return this builder for method chaining.
251+
* @see #asyncConfiguration(ClientAsyncConfiguration)
252+
*/
253+
default Builder asyncConfiguration(Consumer<ClientAsyncConfiguration.Builder> configuration) {
254+
return asyncConfiguration(ClientAsyncConfiguration.builder().applyMutation(configuration).build());
255+
}
219256
}
220257

221258
private static final class DefaultBuilder implements Builder {
@@ -224,6 +261,7 @@ private static final class DefaultBuilder implements Builder {
224261
private Long minimumPartSizeInBytes;
225262
private Double targetThroughputInGbps;
226263
private Integer maxConcurrency;
264+
private ClientAsyncConfiguration asyncConfiguration;
227265

228266
private DefaultBuilder() {
229267
}
@@ -234,6 +272,7 @@ private DefaultBuilder(S3ClientConfiguration configuration) {
234272
this.minimumPartSizeInBytes = configuration.minimumPartSizeInBytes;
235273
this.targetThroughputInGbps = configuration.targetThroughputInGbps;
236274
this.maxConcurrency = configuration.maxConcurrency;
275+
this.asyncConfiguration = configuration.asyncConfiguration;
237276
}
238277

239278
@Override
@@ -266,6 +305,12 @@ public Builder maxConcurrency(Integer maxConcurrency) {
266305
return this;
267306
}
268307

308+
@Override
309+
public Builder asyncConfiguration(ClientAsyncConfiguration asyncConfiguration) {
310+
this.asyncConfiguration = asyncConfiguration;
311+
return this;
312+
}
313+
269314
@Override
270315
public S3ClientConfiguration build() {
271316
return new S3ClientConfiguration(this);

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtCredentialsProviderAdapter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,11 @@
3232
@SdkInternalApi
3333
public final class CrtCredentialsProviderAdapter implements SdkAutoCloseable {
3434
private final AwsCredentialsProvider credentialsProvider;
35+
private final CredentialsProvider crtCredentials;
3536

3637
public CrtCredentialsProviderAdapter(AwsCredentialsProvider credentialsProvider) {
3738
this.credentialsProvider = credentialsProvider;
38-
}
39-
40-
public CredentialsProvider crtCredentials() {
41-
return new DelegateCredentialsProvider.DelegateCredentialsProviderBuilder()
39+
this.crtCredentials = new DelegateCredentialsProvider.DelegateCredentialsProviderBuilder()
4240
.withHandler(() -> {
4341
AwsCredentials sdkCredentials = credentialsProvider.resolveCredentials();
4442
byte[] accessKey = sdkCredentials.accessKeyId().getBytes(StandardCharsets.UTF_8);
@@ -57,10 +55,15 @@ public CredentialsProvider crtCredentials() {
5755
}).build();
5856
}
5957

58+
public CredentialsProvider crtCredentials() {
59+
return crtCredentials;
60+
}
61+
6062
@Override
6163
public void close() {
6264
if (credentialsProvider instanceof SdkAutoCloseable) {
6365
((SdkAutoCloseable) credentialsProvider).close();
6466
}
67+
crtCredentials.close();
6568
}
6669
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3CrtAsyncClient.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
2525
import software.amazon.awssdk.core.async.AsyncRequestBody;
2626
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
27+
import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
2728
import software.amazon.awssdk.http.SdkHttpResponse;
2829
import software.amazon.awssdk.regions.Region;
2930
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -43,7 +44,8 @@ public DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
4344
.targetThroughputInGbps(builder.targetThroughputInGbps())
4445
.partSizeInBytes(builder.minimumPartSizeInBytes())
4546
.maxConcurrency(builder.maxConcurrency)
46-
.credentialsProvider(builder.credentialsProvider);
47+
.credentialsProvider(builder.credentialsProvider)
48+
.asyncConfiguration(builder.asyncConfiguration);
4749
if (builder.region() != null) {
4850
configBuilder.signingRegion(builder.region().id());
4951
}
@@ -80,24 +82,19 @@ public <ReturnT> CompletableFuture<ReturnT> getObject(
8082
// Forward the cancellation to crtFuture to cancel the request
8183
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
8284

83-
// Forward th exception from the CRT returnFuture to the return future in case
85+
// Forward the exception from the CRT future to the return future in case
8486
// the adapter callback didn't get it
8587
CompletableFutureUtils.forwardExceptionTo(crtFuture, returnFuture);
8688

87-
adapterFuture.whenComplete((r, t) -> {
88-
if (t == null) {
89-
returnFuture.complete(r);
90-
} else {
91-
returnFuture.completeExceptionally(t);
92-
}
93-
// TODO: Offload to returnFuture completion thread
94-
});
89+
CompletableFutureUtils.forwardResultTo(adapterFuture, returnFuture, configuration.futureCompletionExecutor());
9590

9691
return CompletableFutureUtils.forwardExceptionTo(returnFuture, adapterFuture);
9792
}
9893

9994
@Override
10095
public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) {
96+
CompletableFuture<PutObjectResponse> returnFuture = new CompletableFuture<>();
97+
10198
com.amazonaws.s3.model.PutObjectRequest adaptedRequest = S3CrtPojoConversion.toCrtPutObjectRequest(putObjectRequest);
10299

103100
if (adaptedRequest.contentLength() == null && requestBody.contentLength().isPresent()) {
@@ -109,14 +106,18 @@ public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObject
109106
RequestDataSupplierAdapter requestDataSupplier = new RequestDataSupplierAdapter(requestBody);
110107
CompletableFuture<PutObjectOutput> crtFuture = s3NativeClient.putObject(adaptedRequest,
111108
requestDataSupplier);
109+
// Forward the cancellation to crtFuture to cancel the request
110+
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
112111

113112
CompletableFuture<SdkHttpResponse> httpResponseFuture = requestDataSupplier.sdkHttpResponseFuture();
114113
CompletableFuture<PutObjectResponse> executeFuture =
115114
// If the header is not available, passing empty SDK HTTP response
116115
crtFuture.thenApply(putObjectOutput -> S3CrtPojoConversion.fromCrtPutObjectOutput(
117116
putObjectOutput, httpResponseFuture.getNow(SdkHttpResponse.builder().build())));
118117

119-
return CompletableFutureUtils.forwardExceptionTo(executeFuture, crtFuture);
118+
CompletableFutureUtils.forwardResultTo(executeFuture, returnFuture, configuration.futureCompletionExecutor());
119+
120+
return CompletableFutureUtils.forwardExceptionTo(returnFuture, executeFuture);
120121
}
121122

122123
@Override
@@ -136,6 +137,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
136137
private Long minimalPartSizeInBytes;
137138
private Double targetThroughputInGbps;
138139
private Integer maxConcurrency;
140+
private ClientAsyncConfiguration asyncConfiguration;
139141

140142
public AwsCredentialsProvider credentialsProvider() {
141143
return credentialsProvider;
@@ -187,6 +189,12 @@ public S3CrtAsyncClientBuilder maxConcurrency(Integer maxConcurrency) {
187189
return this;
188190
}
189191

192+
@Override
193+
public S3CrtAsyncClientBuilder asyncConfiguration(ClientAsyncConfiguration configuration) {
194+
this.asyncConfiguration = configuration;
195+
return this;
196+
}
197+
190198
@Override
191199
public S3CrtAsyncClient build() {
192200
return new DefaultS3CrtAsyncClient(this);

0 commit comments

Comments
 (0)