Skip to content

Implement automatic multipart copy functionality in S3 CRT async client #3403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public TypeSpec poetSpec() {
.addAnnotation(SdkInternalApi.class)
.addModifiers(Modifier.ABSTRACT, Modifier.PUBLIC)
.addField(FieldSpec.builder(interfaceClass, "delegate")
.addModifiers(Modifier.PRIVATE, Modifier.FINAL)
.addModifiers(Modifier.PROTECTED, Modifier.FINAL)
.build())
.addMethods(operations())
.addMethod(closeMethod());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
@Generated("software.amazon.awssdk:codegen")
@SdkInternalApi
public abstract class DelegatingJsonAsyncClient implements JsonAsyncClient {
private final JsonAsyncClient delegate;
protected final JsonAsyncClient delegate;

public DelegatingJsonAsyncClient(JsonAsyncClient delegate) {
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public static void setUp() throws Exception {

@AfterAll
public static void cleanUp() {
s3.close();
s3Async.close();
CrtResource.waitForNoResources();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
Expand All @@ -38,25 +39,29 @@ public class S3TransferManagerCopyIntegrationTest extends S3IntegrationTestBase
private static final String ORIGINAL_OBJ = "test_file.dat";
private static final String COPIED_OBJ = "test_file_copy.dat";
private static final String ORIGINAL_OBJ_SPECIAL_CHARACTER = "original-special-chars-@$%";
private static final String COPIED_OBJ_SPECIAL_CHARACTER= "special-special-chars-@$%";
private static final String COPIED_OBJ_SPECIAL_CHARACTER = "special-special-chars-@$%";
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * MB, 16 * MB + 1);

private static S3TransferManager tm;

private static S3AsyncClient s3AsyncClient;

@BeforeAll
public static void setUp() throws Exception {
S3IntegrationTestBase.setUp();
createBucket(BUCKET);
s3AsyncClient = S3CrtAsyncClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.maxConcurrency(100)
.build();
tm = S3TransferManager.builder()
.s3AsyncClient(S3CrtAsyncClient.builder().credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.maxConcurrency(100)
.build())
.s3AsyncClient(s3AsyncClient)
.build();
}

@AfterAll
public static void teardown() throws Exception {
s3AsyncClient.close();
tm.close();
deleteBucketAndAllContents(BUCKET);
S3IntegrationTestBase.cleanUp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.transfer.s3;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
Expand All @@ -24,7 +25,7 @@
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
Expand Down Expand Up @@ -512,10 +513,14 @@ default DirectoryDownload downloadDirectory(Consumer<DownloadDirectoryRequest.Bu
}

/**
* Creates a copy of an object that is already stored in S3.
* Creates a copy of an object that is already stored in S3 in the same region.
* <p>
* Under the hood, {@link S3TransferManager} will intelligently use plain {@link CopyObjectRequest}s for smaller objects, or
* multiple parallel {@link UploadPartCopyRequest}s for larger objects.
* multiple parallel {@link UploadPartCopyRequest}s for larger objects. This behavior can be configured via
* {@link S3CrtAsyncClientBuilder#minimumPartSizeInBytes(Long)}. Note that for multipart copy request, existing metadata
* stored in the source object is NOT copied to the destination object; if required, you can retrieve the metadata
* from the source object and set it explicitly in the {@link CopyObjectRequest.Builder#metadata(Map)}.
*
* <p>
* While this API supports {@link TransferListener}s, they will not receive {@code bytesTransferred} callback-updates due to
* the way the {@link CopyObjectRequest} API behaves. When copying an object, S3 performs the byte copying on your behalf
Expand Down Expand Up @@ -574,15 +579,21 @@ static S3TransferManager.Builder builder() {
interface Builder {

/**
* Low level S3 client that implements {@link S3AsyncClient}.The {@link S3TransferManager} already provides sensible
* default client. As of now only {@link S3CrtAsyncClient} supports concurrent execution of Transfer manager operations.
* Specify the low level {@link S3AsyncClient} that will be used to send requests to S3. The SDK will create a default
* {@link S3AsyncClient} if not provided.
*
* <p>
* It's highly recommended using {@link S3AsyncClient#crtBuilder()} to create an {@link S3AsyncClient} instance to benefit
* from multipart upload/download feature and maximum throughput.
*
* <p>
* Note : The provided S3AsyncClient will not be closed when the transfer manager is closed.
* This s3AsyncClient must be closed by the caller when it is ready to be disposed.
* @param s3AsyncClient Implementation of {@link S3AsyncClient}
* Note: the provided {@link S3AsyncClient} will not be closed when the transfer manager is closed; it must be closed by
* the caller when it is ready to be disposed.
*
* @param s3AsyncClient the S3 async client
* @return Returns a reference to this object so that method calls can be chained together.
* @see S3AsyncClient#crtBuilder()
*/

Builder s3AsyncClient(S3AsyncClient s3AsyncClient);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
public class TransferManagerConfiguration implements SdkAutoCloseable {
private final AttributeMap options;


private TransferManagerConfiguration(Builder builder) {
AttributeMap.Builder standardOptions = AttributeMap.builder();
standardOptions.put(UPLOAD_DIRECTORY_FOLLOW_SYMBOLIC_LINKS, builder.followSymbolicLinks);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.crt;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.services.s3.utils.ChecksumUtils.computeCheckSum;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;

public class S3CrtClientCopyIntegrationTest extends S3IntegrationTestBase {
private static final String BUCKET = temporaryBucketName(S3CrtClientCopyIntegrationTest.class);
private static final String ORIGINAL_OBJ = "test_file.dat";
private static final String COPIED_OBJ = "test_file_copy.dat";
private static final String ORIGINAL_OBJ_SPECIAL_CHARACTER = "original-special-chars-@$%";
private static final String COPIED_OBJ_SPECIAL_CHARACTER = "special-special-chars-@$%";
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024 * 1024, 16 * 1024 * 1024 + 1);
private static final long SMALL_OBJ_SIZE = 1024 * 1024;
private static S3AsyncClient s3CrtAsyncClient;
@BeforeAll
public static void setUp() throws Exception {
S3IntegrationTestBase.setUp();
createBucket(BUCKET);
s3CrtAsyncClient = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
.build();
}

@AfterAll
public static void teardown() throws Exception {
s3CrtAsyncClient.close();
deleteBucketAndAllContents(BUCKET);
}

@Test
void copy_singlePart_hasSameContent() {
byte[] originalContent = randomBytes(SMALL_OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
validateCopiedObject(originalContent, ORIGINAL_OBJ);
}

@Test
void copy_copiedObject_hasSameContent() {
byte[] originalContent = randomBytes(OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ);
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
validateCopiedObject(originalContent, ORIGINAL_OBJ);
}

@Test
void copy_specialCharacters_hasSameContent() {
byte[] originalContent = randomBytes(OBJ_SIZE);
createOriginalObject(originalContent, ORIGINAL_OBJ_SPECIAL_CHARACTER);
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER);
validateCopiedObject(originalContent, COPIED_OBJ_SPECIAL_CHARACTER);
}

private void createOriginalObject(byte[] originalContent, String originalKey) {
s3CrtAsyncClient.putObject(r -> r.bucket(BUCKET)
.key(originalKey),
AsyncRequestBody.fromBytes(originalContent)).join();
}

private void copyObject(String original, String destination) {
CompletableFuture<CopyObjectResponse> future = s3CrtAsyncClient.copyObject(c -> c
.sourceBucket(BUCKET)
.sourceKey(original)
.destinationBucket(BUCKET)
.destinationKey(destination));

CopyObjectResponse copyObjectResponse = future.join();
assertThat(copyObjectResponse.responseMetadata().requestId()).isNotNull();
assertThat(copyObjectResponse.sdkHttpResponse()).isNotNull();
}

private void validateCopiedObject(byte[] originalContent, String originalKey) {
ResponseBytes<GetObjectResponse> copiedObject = s3.getObject(r -> r.bucket(BUCKET)
.key(originalKey),
ResponseTransformer.toBytes());
assertThat(computeCheckSum(copiedObject.asByteBuffer())).isEqualTo(computeCheckSum(ByteBuffer.wrap(originalContent)));
}

private static byte[] randomBytes(long size) {
byte[] bytes = new byte[Math.toIntExact(size)];
ThreadLocalRandom.current().nextBytes(bytes);
return bytes;
}
}
Loading