Skip to content

Update s3-benchmarks to support java-based multipart download. #5003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
Expand Down Expand Up @@ -299,7 +298,7 @@ public <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downloadReq
try {
assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");

CompletableFuture<ResultT> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
CompletableFuture<ResultT> future = s3AsyncClient.getObject(downloadRequest.getObjectRequest(), responseTransformer);

// Forward download cancellation to future
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
Expand Down Expand Up @@ -341,7 +340,8 @@ private TransferProgressUpdater doDownloadFile(

assertNotUnsupportedArn(downloadRequest.getObjectRequest().bucket(), "download");

CompletableFuture<GetObjectResponse> future = doGetObject(downloadRequest.getObjectRequest(), responseTransformer);
CompletableFuture<GetObjectResponse> future = s3AsyncClient.getObject(
downloadRequest.getObjectRequest(), responseTransformer);

// Forward download cancellation to future
CompletableFutureUtils.forwardExceptionTo(returnFuture, future);
Expand Down Expand Up @@ -509,14 +509,4 @@ private static boolean isMrapArn(Arn arn) {

return !s3EndpointResource.region().isPresent();
}

// TODO remove once MultipartS3AsyncClient is complete
private <ResultT> CompletableFuture<ResultT> doGetObject(
GetObjectRequest getObjectRequest, AsyncResponseTransformer<GetObjectResponse, ResultT> asyncResponseTransformer) {
S3AsyncClient clientToUse = s3AsyncClient;
if (s3AsyncClient instanceof MultipartS3AsyncClient) {
clientToUse = (S3AsyncClient) ((DelegatingS3AsyncClient) s3AsyncClient).delegate();
}
return clientToUse.getObject(getObjectRequest, asyncResponseTransformer);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

Expand All @@ -41,15 +40,13 @@ public abstract class BaseJavaS3ClientBenchmark implements TransferManagerBenchm
protected final String bucket;
protected final String key;
protected final Duration timeout;
private final ChecksumAlgorithm checksumAlgorithm;
private final int iteration;

protected BaseJavaS3ClientBenchmark(TransferManagerBenchmarkConfig config) {
this.bucket = Validate.paramNotNull(config.bucket(), "bucket");
this.key = Validate.paramNotNull(config.key(), "key");
this.timeout = Validate.getOrDefault(config.timeout(), () -> DEFAULT_TIMEOUT);
this.iteration = Validate.getOrDefault(config.iteration(), () -> BENCHMARK_ITERATIONS);
this.checksumAlgorithm = config.checksumAlgorithm();

this.s3Client = S3Client.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,8 @@ public abstract class BaseTransferManagerBenchmark implements TransferManagerBen

BaseTransferManagerBenchmark(TransferManagerBenchmarkConfig config) {
logger.info(() -> "Benchmark config: " + config);
Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB;
Long readBufferSizeInMb = config.readBufferSizeInMb() == null ? null : config.readBufferSizeInMb() * MB;
S3CrtAsyncClientBuilder builder = S3CrtAsyncClient.builder()
.targetThroughputInGbps(config.targetThroughput())
.minimumPartSizeInBytes(partSizeInMb)
.initialReadBufferSizeInBytes(readBufferSizeInMb)
.targetThroughputInGbps(config.targetThroughput() == null ?
Double.valueOf(100.0) :
config.targetThroughput());
if (config.maxConcurrency() != null) {
builder.maxConcurrency(config.maxConcurrency());
}
s3 = builder.build();

s3 = createS3AsyncClient(config);
s3Sync = S3Client.builder().build();
transferManager = S3TransferManager.builder()
.s3Client(s3)
Expand Down Expand Up @@ -171,6 +160,40 @@ private void warmUpDownloadBatch() {
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
}

private S3AsyncClient createS3AsyncClient(TransferManagerBenchmarkConfig config) {
Long partSizeInMb = config.partSizeInMb() == null ? null : config.partSizeInMb() * MB;
Long readBufferSizeInMb = config.readBufferSizeInMb() == null ? null : config.readBufferSizeInMb() * MB;
switch (config.s3Client()) {
case CRT: {
logger.info(() -> "Using CRT S3 Async client");
S3CrtAsyncClientBuilder builder = S3CrtAsyncClient.builder()
.targetThroughputInGbps(config.targetThroughput())
.minimumPartSizeInBytes(partSizeInMb)
.initialReadBufferSizeInBytes(readBufferSizeInMb)
.targetThroughputInGbps(config.targetThroughput() == null ?
Double.valueOf(100.0) :
config.targetThroughput());
if (config.maxConcurrency() != null) {
builder.maxConcurrency(config.maxConcurrency());
}
return builder.build();
}
case JAVA: {
logger.info(() -> "Using Java-based S3 Async client");
return S3AsyncClient.builder()
.multipartEnabled(true)
.multipartConfiguration(c -> c.minimumPartSizeInBytes(partSizeInMb * MB)
.thresholdInBytes(partSizeInMb * 2 * MB)
.apiCallBufferSizeInBytes(readBufferSizeInMb * MB))
.httpClientBuilder(TransferManagerBenchmark.httpClient(config))
.build();
}
default:
throw new IllegalArgumentException("base s3 client must be crt or java");
}
}


private void warmUpUploadBatch() {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class BenchmarkRunner {
private static final String READ_BUFFER_IN_MB = "readBufferInMB";

private static final String VERSION = "version";
private static final String S3_CLIENT = "s3Client";

private static final String PREFIX = "prefix";

private static final String TIMEOUT = "timeoutInMin";
Expand Down Expand Up @@ -90,6 +92,10 @@ public static void main(String... args) throws org.apache.commons.cli.ParseExcep
options.addOption(null, READ_BUFFER_IN_MB, true, "Read buffer size in MB");
options.addOption(null, VERSION, true, "The major version of the transfer manager to run test: "
+ "v1 | v2 | crt | java, default: v2");
options.addOption(null, S3_CLIENT, true, "For v2 transfer manager benchmarks, which base s3 client "
+ "should be used: "
+ "crt | java, default: crt");

options.addOption(null, PREFIX, true, "S3 Prefix used in downloadDirectory and uploadDirectory");

options.addOption(null, CONTENT_LENGTH, true, "Content length to upload from memory. Used only in the "
Expand Down Expand Up @@ -155,6 +161,11 @@ private static TransferManagerBenchmarkConfig parseConfig(CommandLine cmd) {
TransferManagerOperation operation = TransferManagerOperation.valueOf(cmd.getOptionValue(OPERATION)
.toUpperCase(Locale.ENGLISH));

TransferManagerBaseS3Client s3Client = cmd.getOptionValue(S3_CLIENT) == null
? TransferManagerBaseS3Client.CRT
: TransferManagerBaseS3Client.valueOf(cmd.getOptionValue(S3_CLIENT)
.toUpperCase(Locale.ENGLISH));

String filePath = cmd.getOptionValue(FILE);
String bucket = cmd.getOptionValue(BUCKET);
String key = cmd.getOptionValue(KEY);
Expand Down Expand Up @@ -209,6 +220,7 @@ private static TransferManagerBenchmarkConfig parseConfig(CommandLine cmd) {
.connectionAcquisitionTimeoutInSec(connAcqTimeoutInSec)
.forceCrtHttpClient(forceCrtHttpClient)
.maxConcurrency(maxConcurrency)
.s3Client(s3Client)
.build();
}

Expand All @@ -220,6 +232,11 @@ public enum TransferManagerOperation {
UPLOAD_DIRECTORY
}

public enum TransferManagerBaseS3Client {
CRT,
JAVA
}

private enum SdkVersion {
V1,
V2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package software.amazon.awssdk.s3benchmarks;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.utils.Logger;

Expand All @@ -35,12 +37,19 @@ protected void sendOneRequest(List<Double> latencies) throws Exception {
if (filePath == null) {
log.info(() -> "Starting download to memory");
latency = runWithTime(s3AsyncClient.getObject(
req -> req.key(key).bucket(bucket), AsyncResponseTransformer.toBytes()
req -> req.key(key).bucket(bucket), new NoOpResponseTransformer<>()
)::join).latency();
} else {
log.info(() -> "Starting download to file");
Path path = Paths.get(filePath);
FileTransformerConfiguration conf = FileTransformerConfiguration
.builder()
.failureBehavior(FileTransformerConfiguration.FailureBehavior.LEAVE)
.fileWriteOption(FileTransformerConfiguration.FileWriteOption.CREATE_OR_REPLACE_EXISTING)
.build();

latency = runWithTime(s3AsyncClient.getObject(
req -> req.key(key).bucket(bucket), new File(filePath).toPath()
req -> req.key(key).bucket(bucket), AsyncResponseTransformer.toFile(path, conf)
)::join).latency();
}
latencies.add(latency);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,53 @@

package software.amazon.awssdk.s3benchmarks;

import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

import java.time.Duration;
import java.util.function.Supplier;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.utils.Logger;

/**
* Factory to create the benchmark
*/
@FunctionalInterface
public interface TransferManagerBenchmark {
Logger logger = Logger.loggerFor(TransferManagerBenchmark.class);

/**
* The benchmark method to run
*/
void run();

static <T extends SdkAsyncHttpClient.Builder<T>> SdkAsyncHttpClient.Builder<T> httpClient(
TransferManagerBenchmarkConfig config) {
if (config.forceCrtHttpClient()) {
logger.info(() -> "Using CRT HTTP client");
AwsCrtAsyncHttpClient.Builder builder = AwsCrtAsyncHttpClient.builder();
if (config.readBufferSizeInMb() != null) {
builder.readBufferSizeInBytes(config.readBufferSizeInMb() * MB);
}
if (config.maxConcurrency() != null) {
builder.maxConcurrency(config.maxConcurrency());
}
return (T) builder;
}
NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
if (config.connectionAcquisitionTimeoutInSec() != null) {
Duration connAcqTimeout = Duration.ofSeconds(config.connectionAcquisitionTimeoutInSec());
builder.connectionAcquisitionTimeout(connAcqTimeout);
}
if (config.maxConcurrency() != null) {
builder.maxConcurrency(config.maxConcurrency());
}
return (T) builder;
}



static TransferManagerBenchmark v2Download(TransferManagerBenchmarkConfig config) {
return new TransferManagerDownloadBenchmark(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public final class TransferManagerBenchmarkConfig {
private final Long connectionAcquisitionTimeoutInSec;
private final Boolean forceCrtHttpClient;
private final Integer maxConcurrency;
private final BenchmarkRunner.TransferManagerBaseS3Client s3Client;

private final Long readBufferSizeInMb;
private final BenchmarkRunner.TransferManagerOperation operation;
Expand All @@ -55,6 +56,7 @@ private TransferManagerBenchmarkConfig(Builder builder) {
this.connectionAcquisitionTimeoutInSec = builder.connectionAcquisitionTimeoutInSec;
this.forceCrtHttpClient = builder.forceCrtHttpClient;
this.maxConcurrency = builder.maxConcurrency;
this.s3Client = builder.s3Client;
}

public String filePath() {
Expand Down Expand Up @@ -121,6 +123,10 @@ public Integer maxConcurrency() {
return this.maxConcurrency;
}

public BenchmarkRunner.TransferManagerBaseS3Client s3Client() {
return this.s3Client;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -160,6 +166,7 @@ static final class Builder {
private Long connectionAcquisitionTimeoutInSec;
private Boolean forceCrtHttpClient;
private Integer maxConcurrency;
private BenchmarkRunner.TransferManagerBaseS3Client s3Client;

private Integer iteration;
private BenchmarkRunner.TransferManagerOperation operation;
Expand Down Expand Up @@ -247,6 +254,11 @@ public Builder maxConcurrency(Integer maxConcurrency) {
return this;
}

public Builder s3Client(BenchmarkRunner.TransferManagerBaseS3Client s3Client) {
this.s3Client = s3Client;
return this;
}

public TransferManagerBenchmarkConfig build() {
return new TransferManagerBenchmarkConfig(this);
}
Expand Down