Skip to content

Commit 2be7e89

Browse files
committed
Implement multipart copy in Java-based S3 async client
1 parent d998908 commit 2be7e89

File tree

6 files changed

+110
-29
lines changed

6 files changed

+110
-29
lines changed

services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java renamed to services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.services.s3.crt;
16+
package software.amazon.awssdk.services.s3.multipart;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.assertj.core.api.Fail.fail;
@@ -24,33 +24,40 @@
2424
import java.nio.ByteBuffer;
2525
import java.security.SecureRandom;
2626
import java.util.Base64;
27-
import java.util.UUID;
2827
import java.util.concurrent.CompletableFuture;
2928
import java.util.concurrent.ThreadLocalRandom;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.stream.Stream;
3031
import javax.crypto.KeyGenerator;
3132
import org.junit.jupiter.api.AfterAll;
3233
import org.junit.jupiter.api.BeforeAll;
33-
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.api.Disabled;
35+
import org.junit.jupiter.api.Timeout;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.MethodSource;
3438
import software.amazon.awssdk.core.ResponseBytes;
3539
import software.amazon.awssdk.core.async.AsyncRequestBody;
3640
import software.amazon.awssdk.core.sync.ResponseTransformer;
3741
import software.amazon.awssdk.services.s3.S3AsyncClient;
3842
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
3943
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
44+
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
4045
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
4146
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
4247
import software.amazon.awssdk.services.s3.model.MetadataDirective;
4348
import software.amazon.awssdk.utils.Md5Utils;
4449

45-
public class S3CrtClientCopyIntegrationTest extends S3IntegrationTestBase {
46-
private static final String BUCKET = temporaryBucketName(S3CrtClientCopyIntegrationTest.class);
50+
@Timeout(value = 3, unit = TimeUnit.MINUTES)
51+
public class S3ClientMultiPartCopyIntegrationTest extends S3IntegrationTestBase {
52+
private static final String BUCKET = temporaryBucketName(S3ClientMultiPartCopyIntegrationTest.class);
4753
private static final String ORIGINAL_OBJ = "test_file.dat";
4854
private static final String COPIED_OBJ = "test_file_copy.dat";
4955
private static final String ORIGINAL_OBJ_SPECIAL_CHARACTER = "original-special-chars-@$%";
5056
private static final String COPIED_OBJ_SPECIAL_CHARACTER = "special-special-chars-@$%";
5157
private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024 * 1024, 16 * 1024 * 1024 + 1);
5258
private static final long SMALL_OBJ_SIZE = 1024 * 1024;
5359
private static S3AsyncClient s3CrtAsyncClient;
60+
private static S3AsyncClient s3MpuClient;
5461
@BeforeAll
5562
public static void setUp() throws Exception {
5663
S3IntegrationTestBase.setUp();
@@ -59,40 +66,50 @@ public static void setUp() throws Exception {
5966
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
6067
.region(DEFAULT_REGION)
6168
.build();
69+
s3MpuClient = new MultipartS3AsyncClient(s3Async);
6270
}
6371

6472
@AfterAll
6573
public static void teardown() throws Exception {
6674
s3CrtAsyncClient.close();
75+
s3MpuClient.close();
6776
deleteBucketAndAllContents(BUCKET);
6877
}
6978

70-
@Test
71-
void copy_singlePart_hasSameContent() {
79+
public static Stream<S3AsyncClient> s3AsyncClient() {
80+
return Stream.of(s3MpuClient, s3CrtAsyncClient);
81+
}
82+
83+
@ParameterizedTest(autoCloseArguments = false)
84+
@MethodSource("s3AsyncClient")
85+
void copy_singlePart_hasSameContent(S3AsyncClient s3AsyncClient) {
7286
byte[] originalContent = randomBytes(SMALL_OBJ_SIZE);
7387
createOriginalObject(originalContent, ORIGINAL_OBJ);
74-
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
88+
copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient);
7589
validateCopiedObject(originalContent, ORIGINAL_OBJ);
7690
}
7791

78-
@Test
79-
void copy_copiedObject_hasSameContent() {
92+
@ParameterizedTest(autoCloseArguments = false)
93+
@MethodSource("s3AsyncClient")
94+
void copy_copiedObject_hasSameContent(S3AsyncClient s3AsyncClient) {
8095
byte[] originalContent = randomBytes(OBJ_SIZE);
8196
createOriginalObject(originalContent, ORIGINAL_OBJ);
82-
copyObject(ORIGINAL_OBJ, COPIED_OBJ);
97+
copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient);
8398
validateCopiedObject(originalContent, ORIGINAL_OBJ);
8499
}
85100

86-
@Test
87-
void copy_specialCharacters_hasSameContent() {
101+
@ParameterizedTest(autoCloseArguments = false)
102+
@MethodSource("s3AsyncClient")
103+
void copy_specialCharacters_hasSameContent(S3AsyncClient s3AsyncClient) {
88104
byte[] originalContent = randomBytes(OBJ_SIZE);
89105
createOriginalObject(originalContent, ORIGINAL_OBJ_SPECIAL_CHARACTER);
90-
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER);
106+
copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER, s3AsyncClient);
91107
validateCopiedObject(originalContent, COPIED_OBJ_SPECIAL_CHARACTER);
92108
}
93109

94-
@Test
95-
void copy_ssecServerSideEncryption_shouldSucceed() {
110+
@ParameterizedTest(autoCloseArguments = false)
111+
@MethodSource("s3AsyncClient")
112+
void copy_ssecServerSideEncryption_shouldSucceed(S3AsyncClient s3AsyncClient) {
96113
byte[] originalContent = randomBytes(OBJ_SIZE);
97114
byte[] secretKey = generateSecretKey();
98115
String b64Key = Base64.getEncoder().encodeToString(secretKey);
@@ -102,16 +119,16 @@ void copy_ssecServerSideEncryption_shouldSucceed() {
102119
String newB64Key = Base64.getEncoder().encodeToString(newSecretKey);
103120
String newB64KeyMd5 = Md5Utils.md5AsBase64(newSecretKey);
104121

105-
// Java S3 client is used because CRT S3 client putObject fails with SSE-C
106-
// TODO: change back to S3CrtClient once the issue is fixed in CRT
122+
// MPU S3 client gets stuck
123+
// TODO: change back to s3AsyncClient once the issue is fixed in MPU S3 client
107124
s3Async.putObject(r -> r.bucket(BUCKET)
108125
.key(ORIGINAL_OBJ)
109126
.sseCustomerKey(b64Key)
110127
.sseCustomerAlgorithm(AES256.name())
111128
.sseCustomerKeyMD5(b64KeyMd5),
112129
AsyncRequestBody.fromBytes(originalContent)).join();
113130

114-
CompletableFuture<CopyObjectResponse> future = s3CrtAsyncClient.copyObject(c -> c
131+
CompletableFuture<CopyObjectResponse> future = s3AsyncClient.copyObject(c -> c
115132
.sourceBucket(BUCKET)
116133
.sourceKey(ORIGINAL_OBJ)
117134
.metadataDirective(MetadataDirective.REPLACE)
@@ -147,8 +164,8 @@ private void createOriginalObject(byte[] originalContent, String originalKey) {
147164
AsyncRequestBody.fromBytes(originalContent)).join();
148165
}
149166

150-
private void copyObject(String original, String destination) {
151-
CompletableFuture<CopyObjectResponse> future = s3CrtAsyncClient.copyObject(c -> c
167+
private void copyObject(String original, String destination, S3AsyncClient s3AsyncClient) {
168+
CompletableFuture<CopyObjectResponse> future = s3AsyncClient.copyObject(c -> c
152169
.sourceBucket(BUCKET)
153170
.sourceKey(original)
154171
.destinationBucket(BUCKET)

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder;
5252
import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration;
5353
import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration;
54+
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
5455
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
5556
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
5657
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -67,7 +68,9 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) {
6768
super(initializeS3AsyncClient(builder));
6869
long partSizeInBytes = builder.minimalPartSizeInBytes == null ? DEFAULT_PART_SIZE_IN_BYTES :
6970
builder.minimalPartSizeInBytes;
70-
this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(), partSizeInBytes);
71+
this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(),
72+
partSizeInBytes,
73+
partSizeInBytes);
7174
}
7275

7376
@Override

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelper.java renamed to services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.services.s3.internal.crt;
16+
package software.amazon.awssdk.services.s3.internal.multipart;
1717

1818

1919
import java.util.ArrayList;
@@ -23,6 +23,7 @@
2323
import java.util.stream.IntStream;
2424
import software.amazon.awssdk.annotations.SdkInternalApi;
2525
import software.amazon.awssdk.services.s3.S3AsyncClient;
26+
import software.amazon.awssdk.services.s3.internal.crt.UploadPartCopyRequestIterable;
2627
import software.amazon.awssdk.services.s3.internal.multipart.GenericMultipartHelper;
2728
import software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils;
2829
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
@@ -50,13 +51,15 @@ public final class CopyObjectHelper {
5051
private final S3AsyncClient s3AsyncClient;
5152
private final long partSizeInBytes;
5253
private final GenericMultipartHelper<CopyObjectRequest, CopyObjectResponse> genericMultipartHelper;
54+
private final long uploadThreshold;
5355

54-
public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes) {
56+
public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes, long uploadThreshold) {
5557
this.s3AsyncClient = s3AsyncClient;
5658
this.partSizeInBytes = partSizeInBytes;
5759
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
5860
SdkPojoConversionUtils::toAbortMultipartUploadRequest,
5961
SdkPojoConversionUtils::toCopyObjectResponse);
62+
this.uploadThreshold = uploadThreshold;
6063
}
6164

6265
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
@@ -89,7 +92,7 @@ private void doCopyObject(CopyObjectRequest copyObjectRequest, CompletableFuture
8992
HeadObjectResponse headObjectResponse) {
9093
Long contentLength = headObjectResponse.contentLength();
9194

92-
if (contentLength <= partSizeInBytes) {
95+
if (contentLength <= partSizeInBytes || contentLength <= uploadThreshold) {
9396
log.debug(() -> "Starting the copy as a single copy part request");
9497
copyInOneChunk(copyObjectRequest, returnFuture);
9598
} else {

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import software.amazon.awssdk.core.async.AsyncRequestBody;
2222
import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient;
2323
import software.amazon.awssdk.services.s3.S3AsyncClient;
24+
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
25+
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
2426
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
2527
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
2628

@@ -33,15 +35,27 @@ public class MultipartS3AsyncClient extends DelegatingS3AsyncClient {
3335

3436
private static final long DEFAULT_MAX_MEMORY = DEFAULT_PART_SIZE_IN_BYTES * 2;
3537
private final MultipartUploadHelper mpuHelper;
38+
private final CopyObjectHelper copyObjectHelper;
3639

3740
public MultipartS3AsyncClient(S3AsyncClient delegate) {
3841
super(delegate);
3942
// TODO: pass a config object to the upload helper instead
4043
mpuHelper = new MultipartUploadHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD, DEFAULT_MAX_MEMORY);
44+
copyObjectHelper = new CopyObjectHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD);
4145
}
4246

4347
@Override
4448
public CompletableFuture<PutObjectResponse> putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) {
4549
return mpuHelper.uploadObject(putObjectRequest, requestBody);
4650
}
51+
52+
@Override
53+
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
54+
return copyObjectHelper.copyObject(copyObjectRequest);
55+
}
56+
57+
@Override
58+
public void close() {
59+
delegate().close();
60+
}
4761
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/SdkPojoConversionUtils.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,14 @@
3838
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
3939
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
4040
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
41+
import software.amazon.awssdk.utils.Logger;
4142

4243
/**
4344
* Request conversion utility method for POJO classes associated with multipart feature.
4445
*/
4546
@SdkInternalApi
4647
public final class SdkPojoConversionUtils {
48+
private static final Logger log = Logger.loggerFor(SdkPojoConversionUtils.class);
4749

4850
private static final HashSet<String> PUT_OBJECT_REQUEST_TO_UPLOAD_PART_FIELDS_TO_IGNORE =
4951
new HashSet<>(Arrays.asList("ChecksumSHA1", "ChecksumSHA256", "ContentMD5", "ChecksumCRC32C", "ChecksumCRC32"));
@@ -68,9 +70,22 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(PutObj
6870
}
6971

7072
public static HeadObjectRequest toHeadObjectRequest(CopyObjectRequest copyObjectRequest) {
71-
HeadObjectRequest.Builder builder = HeadObjectRequest.builder();
72-
setSdkFields(builder, copyObjectRequest);
73-
return builder.build();
73+
74+
// We can't set SdkFields directly because the fields in CopyObjectRequest do not match 100% with the ones in
75+
// HeadObjectRequest
76+
return HeadObjectRequest.builder()
77+
.bucket(copyObjectRequest.sourceBucket())
78+
.key(copyObjectRequest.sourceKey())
79+
.versionId(copyObjectRequest.sourceVersionId())
80+
.ifMatch(copyObjectRequest.copySourceIfMatch())
81+
.ifModifiedSince(copyObjectRequest.copySourceIfModifiedSince())
82+
.ifNoneMatch(copyObjectRequest.copySourceIfNoneMatch())
83+
.ifUnmodifiedSince(copyObjectRequest.copySourceIfUnmodifiedSince())
84+
.expectedBucketOwner(copyObjectRequest.expectedSourceBucketOwner())
85+
.sseCustomerAlgorithm(copyObjectRequest.copySourceSSECustomerAlgorithm())
86+
.sseCustomerKey(copyObjectRequest.copySourceSSECustomerKey())
87+
.sseCustomerKeyMD5(copyObjectRequest.copySourceSSECustomerKeyMD5())
88+
.build();
7489
}
7590

7691
public static CompletedPart toCompletedPart(CopyPartResult copyPartResult, int partNumber) {
@@ -106,6 +121,8 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(CopyOb
106121
CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder();
107122

108123
setSdkFields(builder, copyObjectRequest);
124+
builder.bucket(copyObjectRequest.destinationBucket());
125+
builder.key(copyObjectRequest.destinationKey());
109126
return builder.build();
110127
}
111128

@@ -136,6 +153,8 @@ private static CopyObjectResult toCopyObjectResult(CompleteMultipartUploadRespon
136153
public static AbortMultipartUploadRequest.Builder toAbortMultipartUploadRequest(CopyObjectRequest copyObjectRequest) {
137154
AbortMultipartUploadRequest.Builder builder = AbortMultipartUploadRequest.builder();
138155
setSdkFields(builder, copyObjectRequest);
156+
builder.bucket(copyObjectRequest.destinationBucket());
157+
builder.key(copyObjectRequest.destinationKey());
139158
return builder;
140159
}
141160

@@ -154,6 +173,8 @@ public static UploadPartCopyRequest toUploadPartCopyRequest(CopyObjectRequest co
154173
return builder.copySourceRange(range)
155174
.partNumber(partNumber)
156175
.uploadId(uploadId)
176+
.bucket(copyObjectRequest.destinationBucket())
177+
.key(copyObjectRequest.destinationKey())
157178
.build();
158179
}
159180

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelperTest.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.mockito.stubbing.Answer;
3434
import software.amazon.awssdk.core.exception.SdkClientException;
3535
import software.amazon.awssdk.services.s3.S3AsyncClient;
36+
import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper;
3637
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
3738
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
3839
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
@@ -59,10 +60,13 @@ class CopyObjectHelperTest {
5960
private S3AsyncClient s3AsyncClient;
6061
private CopyObjectHelper copyHelper;
6162

63+
private static final long PART_SIZE = 1024L;
64+
private static final long UPLOAD_THRESHOLD = 2048L;
65+
6266
@BeforeEach
6367
public void setUp() {
6468
s3AsyncClient = Mockito.mock(S3AsyncClient.class);
65-
copyHelper = new CopyObjectHelper(s3AsyncClient, 1024L);
69+
copyHelper = new CopyObjectHelper(s3AsyncClient, PART_SIZE, UPLOAD_THRESHOLD);
6670
}
6771

6872
@Test
@@ -114,6 +118,25 @@ void singlePartCopy_happyCase_shouldSucceed() {
114118
assertThat(future.join()).isEqualTo(expectedResponse);
115119
}
116120

121+
@Test
122+
void copy_doesNotExceedThreshold_shouldUseSingleObjectCopy() {
123+
124+
CopyObjectRequest copyObjectRequest = copyObjectRequest();
125+
126+
stubSuccessfulHeadObjectCall(2000L);
127+
128+
CopyObjectResponse expectedResponse = CopyObjectResponse.builder().build();
129+
CompletableFuture<CopyObjectResponse> copyFuture =
130+
CompletableFuture.completedFuture(expectedResponse);
131+
132+
when(s3AsyncClient.copyObject(copyObjectRequest)).thenReturn(copyFuture);
133+
134+
CompletableFuture<CopyObjectResponse> future =
135+
copyHelper.copyObject(copyObjectRequest);
136+
137+
assertThat(future.join()).isEqualTo(expectedResponse);
138+
}
139+
117140
@Test
118141
void multiPartCopy_fourPartsHappyCase_shouldSucceed() {
119142
CopyObjectRequest copyObjectRequest = copyObjectRequest();

0 commit comments

Comments
 (0)