Skip to content

Commit 7d82774

Browse files
authored
Update s3-benchmarks to support java-based multipart download. (#5003)
* update s3-benchmarks to support java-based multipart download. * missing s3Client from config * checkstyle * checkstyle * remove TM workaround test * use minimumPartSizeInBytes as thrshold for both CRT and s3 async client. Use TransferManagerBenchmark.httpClient in BaseJavaS3ClientBenchmark
1 parent 336fd35 commit 7d82774

File tree

8 files changed

+113
-126
lines changed

8 files changed

+113
-126
lines changed

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import software.amazon.awssdk.core.exception.SdkClientException;
3434
import software.amazon.awssdk.core.exception.SdkException;
3535
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
36-
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
3736
import software.amazon.awssdk.services.s3.S3AsyncClient;
3837
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
3938
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
@@ -299,7 +298,7 @@ public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadReq
299298
try {
300299
assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");
301300

302-
CompletableFuture<ResultT> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
301+
CompletableFuture<ResultT> future = s3AsyncClient.getObject(downloadRequest.getObjectRequest(), responseTransformer);
303302

304303
// Forward download cancellation to future
305304
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
@@ -341,7 +340,8 @@ private TransferProgressUpdater doDownloadFile(
341340

342341
assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");
343342

344-
CompletableFuture<GetObjectResponse> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
343+
CompletableFuture<GetObjectResponse> future = s3AsyncClient.getObject(
344+
downloadRequest.getObjectRequest(), responseTransformer);
345345

346346
// Forward download cancellation to future
347347
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
@@ -509,14 +509,4 @@ private static boolean isMrapArn(Arn arn) {
509509

510510
return !s3EndpointResource.region().isPresent();
511511
}
512-
513-
// TODO remove once MultipartS3AsyncClient is complete
514-
private <ResultT> CompletableFuture<ResultT> doGetObject(
515-
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ResultT> asyncResponseTransformer) {
516-
S3AsyncClient clientToUse = s3AsyncClient;
517-
if (s3AsyncClient instanceof MultipartS3AsyncClient) {
518-
clientToUse = (S3AsyncClient) ((DelegatingS3AsyncClient) s3AsyncClient).delegate();
519-
}
520-
return clientToUse.getObject(getObjectRequest, asyncResponseTransformer);
521-
}
522512
}

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/MultipartDownloadJavaBasedTest.java

Lines changed: 0 additions & 67 deletions
This file was deleted.

test/s3-benchmarks/src/main/java/software/amazon/awssdk/s3benchmarks/BaseJavaS3ClientBenchmark.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@
2323
import java.time.Duration;
2424
import java.util.ArrayList;
2525
import java.util.List;
26-
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
27-
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
28-
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
2926
import software.amazon.awssdk.services.s3.S3AsyncClient;
3027
import software.amazon.awssdk.services.s3.S3Client;
31-
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
3228
import software.amazon.awssdk.utils.Logger;
3329
import software.amazon.awssdk.utils.Validate;
3430

@@ -41,15 +37,13 @@ public abstract class BaseJavaS3ClientBenchmark implements TransferManagerBenchm
4137
protected final String bucket;
4238
protected final String key;
4339
protected final Duration timeout;
44-
private final ChecksumAlgorithm checksumAlgorithm;
4540
private final int iteration;
4641

4742
protected BaseJavaS3ClientBenchmark(TransferManagerBenchmarkConfig config) {
4843
this.bucket = Validate.paramNotNull(config.bucket(), "bucket");
4944
this.key = Validate.paramNotNull(config.key(), "key");
5045
this.timeout = Validate.getOrDefault(config.timeout(), () -> DEFAULT_TIMEOUT);
5146
this.iteration = Validate.getOrDefault(config.iteration(), () -> BENCHMARK_ITERATIONS);
52-
this.checksumAlgorithm = config.checksumAlgorithm();
5347

5448
this.s3Client = S3Client.create();
5549

@@ -62,33 +56,10 @@ protected BaseJavaS3ClientBenchmark(TransferManagerBenchmarkConfig config) {
6256
.multipartConfiguration(c -> c.minimumPartSizeInBytes(partSizeInMb * MB)
6357
.thresholdInBytes(partSizeInMb * 2 * MB)
6458
.apiCallBufferSizeInBytes(readBufferInMb * MB))
65-
.httpClientBuilder(httpClient(config))
59+
.httpClientBuilder(TransferManagerBenchmark.httpClient(config))
6660
.build();
6761
}
6862

69-
private SdkAsyncHttpClient.Builder httpClient(TransferManagerBenchmarkConfig config) {
70-
if (config.forceCrtHttpClient()) {
71-
logger.info(() -> "Using CRT HTTP client");
72-
AwsCrtAsyncHttpClient.Builder builder = AwsCrtAsyncHttpClient.builder();
73-
if (config.readBufferSizeInMb() != null) {
74-
builder.readBufferSizeInBytes(config.readBufferSizeInMb() * MB);
75-
}
76-
if (config.maxConcurrency() != null) {
77-
builder.maxConcurrency(config.maxConcurrency());
78-
}
79-
return builder;
80-
}
81-
NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
82-
if (config.connectionAcquisitionTimeoutInSec() != null) {
83-
Duration connAcqTimeout = Duration.ofSeconds(config.connectionAcquisitionTimeoutInSec());
84-
builder.connectionAcquisitionTimeout(connAcqTimeout);
85-
}
86-
if (config.maxConcurrency() != null) {
87-
builder.maxConcurrency(config.maxConcurrency());
88-
}
89-
return builder;
90-
}
91-
9263
protected abstract void sendOneRequest(List<Double> latencies) throws Exception;
9364

9465
protected abstract long contentLength() throws Exception;

test/s3-benchmarks/src/main/java/software/amazon/awssdk/s3benchmarks/BaseTransferManagerBenchmark.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,8 @@ public abstract class BaseTransferManagerBenchmark implements TransferManagerBen
6060

6161
BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config) {
6262
logger.info(() -> "Benchmark config: " + config);
63-
Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB;
64-
Long readBufferSizeInMb = config.readBufferSizeInMb() == null ? null : config.readBufferSizeInMb() * MB;
65-
S3CrtAsyncClientBuilder builder = S3CrtAsyncClient.builder()
66-
.targetThroughputInGbps(config.targetThroughput())
67-
.minimumPartSizeInBytes(partSizeInMb)
68-
.initialReadBufferSizeInBytes(readBufferSizeInMb)
69-
.targetThroughputInGbps(config.targetThroughput() == null ?
70-
Double.valueOf(100.0) :
71-
config.targetThroughput());
72-
if (config.maxConcurrency() != null) {
73-
builder.maxConcurrency(config.maxConcurrency());
74-
}
75-
s3 = builder.build();
63+
64+
s3 = createS3AsyncClient(config);
7665
s3Sync = S3Client.builder().build();
7766
transferManager = S3TransferManager.builder()
7867
.s3Client(s3)
@@ -171,6 +160,38 @@ private void warmUpDownloadBatch() {
171160
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
172161
}
173162

163+
private S3AsyncClient createS3AsyncClient(TransferManagerBenchmarkConfig config) {
164+
Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB;
165+
Long readBufferSizeInMb = config.readBufferSizeInMb() == null ? null : config.readBufferSizeInMb() * MB;
166+
switch (config.s3Client()) {
167+
case CRT: {
168+
logger.info(() -> "Using CRT S3 Async client");
169+
S3CrtAsyncClientBuilder builder = S3CrtAsyncClient.builder()
170+
.targetThroughputInGbps(config.targetThroughput())
171+
.minimumPartSizeInBytes(partSizeInMb)
172+
.initialReadBufferSizeInBytes(readBufferSizeInMb)
173+
.targetThroughputInGbps(config.targetThroughput() == null ?
174+
Double.valueOf(100.0) :
175+
config.targetThroughput());
176+
if (config.maxConcurrency() != null) {
177+
builder.maxConcurrency(config.maxConcurrency());
178+
}
179+
return builder.build();
180+
}
181+
case JAVA: {
182+
logger.info(() -> "Using Java-based S3 Async client");
183+
return S3AsyncClient.builder()
184+
.multipartEnabled(true)
185+
.multipartConfiguration(c -> c.minimumPartSizeInBytes(partSizeInMb)
186+
.apiCallBufferSizeInBytes(readBufferSizeInMb))
187+
.httpClientBuilder(TransferManagerBenchmark.httpClient(config))
188+
.build();
189+
}
190+
default:
191+
throw new IllegalArgumentException("base s3 client must be crt or java");
192+
}
193+
}
194+
174195
private void warmUpUploadBatch() {
175196
List<CompletableFuture<?>> futures = new ArrayList<>();
176197
for (int i = 0; i < 20; i++) {

test/s3-benchmarks/src/main/java/software/amazon/awssdk/s3benchmarks/BenchmarkRunner.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public final class BenchmarkRunner {
4141
private static final String READ_BUFFER_IN_MB = "readBufferInMB";
4242

4343
private static final String VERSION = "version";
44+
private static final String S3_CLIENT = "s3Client";
45+
4446
private static final String PREFIX = "prefix";
4547

4648
private static final String TIMEOUT = "timeoutInMin";
@@ -90,6 +92,10 @@ public static void main(String... args) throws org.apache.commons.cli.ParseExcep
9092
options.addOption(null, READ_BUFFER_IN_MB, true, "Read buffer size in MB");
9193
options.addOption(null, VERSION, true, "The major version of the transfer manager to run test: "
9294
+ "v1 | v2 | crt | java, default: v2");
95+
options.addOption(null, S3_CLIENT, true, "For v2 transfer manager benchmarks, which base s3 client "
96+
+ "should be used: "
97+
+ "crt | java, default: crt");
98+
9399
options.addOption(null, PREFIX, true, "S3 Prefix used in downloadDirectory and uploadDirectory");
94100

95101
options.addOption(null, CONTENT_LENGTH, true, "Content length to upload from memory. Used only in the "
@@ -155,6 +161,11 @@ private static TransferManagerBenchmarkConfig parseConfig(CommandLine cmd) {
155161
TransferManagerOperation operation = TransferManagerOperation.valueOf(cmd.getOptionValue(OPERATION)
156162
.toUpperCase(Locale.ENGLISH));
157163

164+
TransferManagerBaseS3Client s3Client = cmd.getOptionValue(S3_CLIENT) == null
165+
? TransferManagerBaseS3Client.CRT
166+
: TransferManagerBaseS3Client.valueOf(cmd.getOptionValue(S3_CLIENT)
167+
.toUpperCase(Locale.ENGLISH));
168+
158169
String filePath = cmd.getOptionValue(FILE);
159170
String bucket = cmd.getOptionValue(BUCKET);
160171
String key = cmd.getOptionValue(KEY);
@@ -209,6 +220,7 @@ private static TransferManagerBenchmarkConfig parseConfig(CommandLine cmd) {
209220
.connectionAcquisitionTimeoutInSec(connAcqTimeoutInSec)
210221
.forceCrtHttpClient(forceCrtHttpClient)
211222
.maxConcurrency(maxConcurrency)
223+
.s3Client(s3Client)
212224
.build();
213225
}
214226

@@ -220,6 +232,11 @@ public enum TransferManagerOperation {
220232
UPLOAD_DIRECTORY
221233
}
222234

235+
public enum TransferManagerBaseS3Client {
236+
CRT,
237+
JAVA
238+
}
239+
223240
private enum SdkVersion {
224241
V1,
225242
V2,

test/s3-benchmarks/src/main/java/software/amazon/awssdk/s3benchmarks/JavaS3ClientDownloadBenchmark.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
package software.amazon.awssdk.s3benchmarks;
1717

18-
import java.io.File;
18+
import java.nio.file.Path;
19+
import java.nio.file.Paths;
1920
import java.util.List;
21+
import software.amazon.awssdk.core.FileTransformerConfiguration;
2022
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
2123
import software.amazon.awssdk.utils.Logger;
2224

@@ -35,12 +37,19 @@ protected void sendOneRequest(List<Double> latencies) throws Exception {
3537
if (filePath == null) {
3638
log.info(() -> "Starting download to memory");
3739
latency = runWithTime(s3AsyncClient.getObject(
38-
req -> req.key(key).bucket(bucket), AsyncResponseTransformer.toBytes()
40+
req -> req.key(key).bucket(bucket), new NoOpResponseTransformer<>()
3941
)::join).latency();
4042
} else {
4143
log.info(() -> "Starting download to file");
44+
Path path = Paths.get(filePath);
45+
FileTransformerConfiguration conf = FileTransformerConfiguration
46+
.builder()
47+
.failureBehavior(FileTransformerConfiguration.FailureBehavior.LEAVE)
48+
.fileWriteOption(FileTransformerConfiguration.FileWriteOption.CREATE_OR_REPLACE_EXISTING)
49+
.build();
50+
4251
latency = runWithTime(s3AsyncClient.getObject(
43-
req -> req.key(key).bucket(bucket), new File(filePath).toPath()
52+
req -> req.key(key).bucket(bucket), AsyncResponseTransformer.toFile(path, conf)
4453
)::join).latency();
4554
}
4655
latencies.add(latency);

test/s3-benchmarks/src/main/java/software/amazon/awssdk/s3benchmarks/TransferManagerBenchmark.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,53 @@
1515

1616
package software.amazon.awssdk.s3benchmarks;
1717

18+
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
19+
20+
import java.time.Duration;
1821
import java.util.function.Supplier;
22+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
23+
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
24+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
25+
import software.amazon.awssdk.utils.Logger;
1926

2027
/**
2128
* Factory to create the benchmark
2229
*/
2330
@FunctionalInterface
2431
public interface TransferManagerBenchmark {
32+
Logger logger = Logger.loggerFor(TransferManagerBenchmark.class);
2533

2634
/**
2735
* The benchmark method to run
2836
*/
2937
void run();
3038

39+
static <T extends SdkAsyncHttpClient.Builder<T>> SdkAsyncHttpClient.Builder<T> httpClient(
40+
TransferManagerBenchmarkConfig config) {
41+
if (config.forceCrtHttpClient()) {
42+
logger.info(() -> "Using CRT HTTP client");
43+
AwsCrtAsyncHttpClient.Builder builder = AwsCrtAsyncHttpClient.builder();
44+
if (config.readBufferSizeInMb() != null) {
45+
builder.readBufferSizeInBytes(config.readBufferSizeInMb() * MB);
46+
}
47+
if (config.maxConcurrency() != null) {
48+
builder.maxConcurrency(config.maxConcurrency());
49+
}
50+
return (T) builder;
51+
}
52+
NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
53+
if (config.connectionAcquisitionTimeoutInSec() != null) {
54+
Duration connAcqTimeout = Duration.ofSeconds(config.connectionAcquisitionTimeoutInSec());
55+
builder.connectionAcquisitionTimeout(connAcqTimeout);
56+
}
57+
if (config.maxConcurrency() != null) {
58+
builder.maxConcurrency(config.maxConcurrency());
59+
}
60+
return (T) builder;
61+
}
62+
63+
64+
3165
static TransferManagerBenchmark v2Download(TransferManagerBenchmarkConfig config) {
3266
return new TransferManagerDownloadBenchmark(config);
3367
}

0 commit comments

Comments
 (0)