Skip to content

Expose thresholdSizeInBytes in AWS CRT-based S3 client #4282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-318ca43.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "Amazon S3",
"contributor": "",
"description": "Allow users to configure upload threshold size for AWS CRT-based S3 client via `S3CrtAsyncClientBuilder#thresholdInBytes`."
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
public class S3CrtClientPutObjectIntegrationTest extends S3IntegrationTestBase {
private static final String TEST_BUCKET = temporaryBucketName(S3CrtClientPutObjectIntegrationTest.class);
private static final String TEST_KEY = "8mib_file.dat";
private static final int OBJ_SIZE = 8 * 1024 * 1024;
private static final int OBJ_SIZE = 10 * 1024 * 1024;

private static RandomTempFile testFile;
private static S3AsyncClient s3Crt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,26 @@ default S3CrtAsyncClientBuilder retryConfiguration(Consumer<S3CrtRetryConfigurat
*/
S3CrtAsyncClientBuilder crossRegionAccessEnabled(Boolean crossRegionAccessEnabled);

/**
* Configure the size threshold, in bytes, for when to use multipart upload. Uploads/copies over this size will automatically
* use a multipart upload strategy, while uploads/copies smaller than this threshold will use a single connection to
* upload/copy the whole object.
*
* <p>
* Multipart uploads are easier to recover from and also potentially faster than single part uploads, especially when the
* upload parts can be uploaded in parallel. Because there are additional network API calls, small objects are still
* recommended to use a single connection for the upload. See
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html">Uploading and copying objects using
* multipart upload</a>.
*
* <p>
* By default, it is the same as {@link #minimumPartSizeInBytes(Long)}.
*
* @param thresholdInBytes the value of the threshold to set.
* @return an instance of this builder.
*/
S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes);

@Override
S3AsyncClient build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
super(initializeS3AsyncClient(builder));
long partSizeInBytes = builder.minimalPartSizeInBytes == null ? DEFAULT_PART_SIZE_IN_BYTES :
builder.minimalPartSizeInBytes;
long thresholdInBytes = builder.thresholdInBytes == null ? partSizeInBytes : builder.thresholdInBytes;
this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(),
partSizeInBytes,
partSizeInBytes);
thresholdInBytes);
}

@Override
Expand Down Expand Up @@ -117,6 +118,7 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau
Validate.isPositiveOrNull(builder.maxConcurrency, "maxConcurrency");
Validate.isPositiveOrNull(builder.targetThroughputInGbps, "targetThroughputInGbps");
Validate.isPositiveOrNull(builder.minimalPartSizeInBytes, "minimalPartSizeInBytes");
Validate.isPositiveOrNull(builder.thresholdInBytes, "thresholdInBytes");

S3NativeClientConfiguration.Builder nativeClientBuilder =
S3NativeClientConfiguration.builder()
Expand All @@ -128,7 +130,8 @@ private static S3CrtAsyncHttpClient.Builder initializeS3CrtAsyncHttpClient(Defau
.endpointOverride(builder.endpointOverride)
.credentialsProvider(builder.credentialsProvider)
.readBufferSizeInBytes(builder.readBufferSizeInBytes)
.httpConfiguration(builder.httpConfiguration);
.httpConfiguration(builder.httpConfiguration)
.thresholdInBytes(builder.thresholdInBytes);

if (builder.retryConfiguration != null) {
nativeClientBuilder.standardRetryOptions(
Expand Down Expand Up @@ -156,6 +159,7 @@ public static final class DefaultS3CrtClientBuilder implements S3CrtAsyncClientB
private List<ExecutionInterceptor> executionInterceptors;
private S3CrtRetryConfiguration retryConfiguration;
private boolean crossRegionAccessEnabled;
private Long thresholdInBytes;

public AwsCredentialsProvider credentialsProvider() {
return credentialsProvider;
Expand Down Expand Up @@ -276,6 +280,12 @@ public S3CrtAsyncClientBuilder crossRegionAccessEnabled(Boolean crossRegionAcces
return this;
}

@Override
public S3CrtAsyncClientBuilder thresholdInBytes(Long thresholdInBytes) {
this.thresholdInBytes = thresholdInBytes;
return this;
}

@Override
public S3CrtAsyncClient build() {
return new DefaultS3CrtAsyncClient(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private S3CrtAsyncHttpClient(Builder builder) {
.withCredentialsProvider(s3NativeClientConfiguration.credentialsProvider())
.withClientBootstrap(s3NativeClientConfiguration.clientBootstrap())
.withPartSize(s3NativeClientConfiguration.partSizeBytes())
.withMultipartUploadThreshold(s3NativeClientConfiguration.thresholdInBytes())
.withComputeContentMd5(false)
.withMaxConnections(s3NativeClientConfiguration.maxConcurrency())
.withThroughputTargetGbps(s3NativeClientConfiguration.targetThroughputInGbps())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class S3NativeClientConfiguration implements SdkAutoCloseable {
private final CrtCredentialsProviderAdapter credentialProviderAdapter;
private final CredentialsProvider credentialsProvider;
private final long partSizeInBytes;
private final long thresholdInBytes;
private final double targetThroughputInGbps;
private final int maxConcurrency;
private final URI endpointOverride;
Expand Down Expand Up @@ -86,6 +87,8 @@ public S3NativeClientConfiguration(Builder builder) {

this.partSizeInBytes = builder.partSizeInBytes == null ? DEFAULT_PART_SIZE_IN_BYTES :
builder.partSizeInBytes;
this.thresholdInBytes = builder.thresholdInBytes == null ? this.partSizeInBytes :
builder.thresholdInBytes;
this.targetThroughputInGbps = builder.targetThroughputInGbps == null ?
DEFAULT_TARGET_THROUGHPUT_IN_GBPS : builder.targetThroughputInGbps;

Expand Down Expand Up @@ -144,6 +147,10 @@ public long partSizeBytes() {
return partSizeInBytes;
}

public long thresholdInBytes() {
return thresholdInBytes;
}

public double targetThroughputInGbps() {
return targetThroughputInGbps;
}
Expand Down Expand Up @@ -187,6 +194,7 @@ public static final class Builder {

private S3CrtHttpConfiguration httpConfiguration;
private StandardRetryOptions standardRetryOptions;
private Long thresholdInBytes;

private Builder() {
}
Expand Down Expand Up @@ -247,5 +255,10 @@ public Builder standardRetryOptions(StandardRetryOptions standardRetryOptions) {
this.standardRetryOptions = standardRetryOptions;
return this;
}

public Builder thresholdInBytes(Long thresholdInBytes) {
this.thresholdInBytes = thresholdInBytes;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,10 @@ void build_shouldPassThroughParameters() {
S3NativeClientConfiguration.builder()
.maxConcurrency(100)
.signingRegion("us-west-2")
.standardRetryOptions(
new StandardRetryOptions()
.withBackoffRetryOptions(new ExponentialBackoffRetryOptions().withMaxRetries(7)))
.thresholdInBytes(1024L)
.standardRetryOptions(
new StandardRetryOptions()
.withBackoffRetryOptions(new ExponentialBackoffRetryOptions().withMaxRetries(7)))
.httpConfiguration(S3CrtHttpConfiguration.builder()
.connectionTimeout(Duration.ofSeconds(1))
.connectionHealthConfiguration(c -> c.minimumThroughputInBps(1024L)
Expand All @@ -330,6 +331,7 @@ void build_shouldPassThroughParameters() {
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build();
S3ClientOptions clientOptions = client.s3ClientOptions();
assertThat(clientOptions.getConnectTimeoutMs()).isEqualTo(1000);
assertThat(clientOptions.getMultiPartUploadThreshold()).isEqualTo(1024);
assertThat(clientOptions.getStandardRetryOptions().getBackoffRetryOptions().getMaxRetries()).isEqualTo(7);
assertThat(clientOptions.getMaxConnections()).isEqualTo(100);
assertThat(clientOptions.getMonitoringOptions()).satisfies(options -> {
Expand All @@ -347,6 +349,20 @@ void build_shouldPassThroughParameters() {
assertThat(clientOptions.getMaxConnections()).isEqualTo(100);
}

@Test
void build_partSizeConfigured_shouldApplyToThreshold() {
long partSizeInBytes = 10L;
S3NativeClientConfiguration configuration =
S3NativeClientConfiguration.builder()
.partSizeInBytes(partSizeInBytes)
.build();
S3CrtAsyncHttpClient client =
(S3CrtAsyncHttpClient) S3CrtAsyncHttpClient.builder().s3ClientConfiguration(configuration).build();
S3ClientOptions clientOptions = client.s3ClientOptions();
assertThat(clientOptions.getPartSize()).isEqualTo(partSizeInBytes);
assertThat(clientOptions.getMultiPartUploadThreshold()).isEqualTo(clientOptions.getPartSize());
}

@Test
void build_nullHttpConfiguration() {
S3NativeClientConfiguration configuration =
Expand Down