Skip to content

Commit c5b79f0

Browse files
committed
Implement multipart upload in Java-based S3 async client
1 parent c83caea commit c5b79f0

File tree

9 files changed

+1008
-112
lines changed

9 files changed

+1008
-112
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.s3.multipart;
17+
18+
import static java.util.concurrent.TimeUnit.MINUTES;
19+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
20+
21+
import java.nio.file.Files;
22+
import org.assertj.core.api.Assertions;
23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.Timeout;
27+
import software.amazon.awssdk.core.ResponseInputStream;
28+
import software.amazon.awssdk.core.async.AsyncRequestBody;
29+
import software.amazon.awssdk.core.sync.ResponseTransformer;
30+
import software.amazon.awssdk.services.s3.S3AsyncClient;
31+
import software.amazon.awssdk.services.s3.S3IntegrationTestBase;
32+
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
33+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
34+
import software.amazon.awssdk.services.s3.utils.ChecksumUtils;
35+
import software.amazon.awssdk.testutils.RandomTempFile;
36+
37+
public class S3MultipartClientPutObjectIntegrationTest extends S3IntegrationTestBase {
38+
39+
private static final String TEST_BUCKET = temporaryBucketName(S3MultipartClientPutObjectIntegrationTest.class);
40+
private static final String TEST_KEY = "10mib_file.dat";
41+
private static final int OBJ_SIZE = 19 * 1024 * 1024;
42+
43+
private static RandomTempFile testFile;
44+
private static S3AsyncClient mpuS3Client;
45+
46+
@BeforeAll
47+
public static void setup() throws Exception {
48+
S3IntegrationTestBase.setUp();
49+
S3IntegrationTestBase.createBucket(TEST_BUCKET);
50+
51+
testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE);
52+
mpuS3Client = new MultipartS3AsyncClient(s3Async);
53+
}
54+
55+
@AfterAll
56+
public static void teardown() throws Exception {
57+
mpuS3Client.close();
58+
testFile.delete();
59+
deleteBucketAndAllContents(TEST_BUCKET);
60+
}
61+
62+
@Test
63+
@Timeout(value = 1, unit = MINUTES)
64+
void putObject_fileRequestBody_objectSentCorrectly() throws Exception {
65+
AsyncRequestBody body = AsyncRequestBody.fromFile(testFile.toPath());
66+
mpuS3Client.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), body).get(2, MINUTES);
67+
68+
ResponseInputStream<GetObjectResponse> objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY),
69+
ResponseTransformer.toInputStream());
70+
71+
byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath()));
72+
73+
Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum);
74+
}
75+
76+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelper.java

Lines changed: 19 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,11 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.CompletionException;
2322
import java.util.concurrent.atomic.AtomicReferenceArray;
24-
import java.util.function.BiFunction;
25-
import java.util.function.Supplier;
2623
import java.util.stream.IntStream;
2724
import software.amazon.awssdk.annotations.SdkInternalApi;
28-
import software.amazon.awssdk.core.exception.SdkClientException;
2925
import software.amazon.awssdk.services.s3.S3AsyncClient;
30-
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
26+
import software.amazon.awssdk.services.s3.internal.multipart.GenericMultipartHelper;
3127
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
3228
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
3329
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
@@ -50,17 +46,16 @@
5046
public final class CopyObjectHelper {
5147
private static final Logger log = Logger.loggerFor(S3AsyncClient.class);
5248

53-
/**
54-
* The max number of parts on S3 side is 10,000
55-
*/
56-
private static final long MAX_UPLOAD_PARTS = 10_000;
57-
5849
private final S3AsyncClient s3AsyncClient;
5950
private final long partSizeInBytes;
51+
private final GenericMultipartHelper<CopyObjectRequest, CopyObjectResponse> genericMultipartHelper;
6052

6153
public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes) {
6254
this.s3AsyncClient = s3AsyncClient;
6355
this.partSizeInBytes = partSizeInBytes;
56+
this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient,
57+
RequestConversionUtils::toAbortMultipartUploadRequest,
58+
RequestConversionUtils::toCopyObjectResponse);
6459
}
6560

6661
public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyObjectRequest) {
@@ -69,14 +64,15 @@ public CompletableFuture<CopyObjectResponse> copyObject(CopyObjectRequest copyOb
6964

7065
try {
7166
CompletableFuture<HeadObjectResponse> headFuture =
72-
s3AsyncClient.headObject(CopyRequestConversionUtils.toHeadObjectRequest(copyObjectRequest));
67+
s3AsyncClient.headObject(RequestConversionUtils.toHeadObjectRequest(copyObjectRequest));
7368

7469
// Ensure cancellations are forwarded to the head future
7570
CompletableFutureUtils.forwardExceptionTo(returnFuture, headFuture);
7671

7772
headFuture.whenComplete((headObjectResponse, throwable) -> {
7873
if (throwable != null) {
79-
handleException(returnFuture, () -> "Failed to retrieve metadata from the source object", throwable);
74+
genericMultipartHelper.handleException(returnFuture, () -> "Failed to retrieve metadata from the source "
75+
+ "object", throwable);
8076
} else {
8177
doCopyObject(copyObjectRequest, returnFuture, headObjectResponse);
8278
}
@@ -105,7 +101,7 @@ private void copyInParts(CopyObjectRequest copyObjectRequest,
105101
Long contentLength,
106102
CompletableFuture<CopyObjectResponse> returnFuture) {
107103

108-
CreateMultipartUploadRequest request = CopyRequestConversionUtils.toCreateMultipartUploadRequest(copyObjectRequest);
104+
CreateMultipartUploadRequest request = RequestConversionUtils.toCreateMultipartUploadRequest(copyObjectRequest);
109105
CompletableFuture<CreateMultipartUploadResponse> createMultipartUploadFuture =
110106
s3AsyncClient.createMultipartUpload(request);
111107

@@ -114,25 +110,22 @@ private void copyInParts(CopyObjectRequest copyObjectRequest,
114110

115111
createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
116112
if (throwable != null) {
117-
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
113+
genericMultipartHelper.handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
118114
} else {
119115
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
120116
doCopyInParts(copyObjectRequest, contentLength, returnFuture, createMultipartUploadResponse.uploadId());
121117
}
122118
});
123119
}
124120

125-
private int determinePartCount(long contentLength, long partSize) {
126-
return (int) Math.ceil(contentLength / (double) partSize);
127-
}
128-
129121
private void doCopyInParts(CopyObjectRequest copyObjectRequest,
130122
Long contentLength,
131123
CompletableFuture<CopyObjectResponse> returnFuture,
132124
String uploadId) {
133-
long optimalPartSize = calculateOptimalPartSizeForCopy(contentLength);
134125

135-
int partCount = determinePartCount(contentLength, optimalPartSize);
126+
long optimalPartSize = genericMultipartHelper.calculateOptimalPartSizeFor(contentLength, partSizeInBytes);
127+
128+
int partCount = genericMultipartHelper.determinePartCount(contentLength, optimalPartSize);
136129

137130
log.debug(() -> String.format("Starting multipart copy with partCount: %s, optimalPartSize: %s",
138131
partCount, optimalPartSize));
@@ -147,32 +140,15 @@ private void doCopyInParts(CopyObjectRequest copyObjectRequest,
147140
optimalPartSize);
148141
CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(new CompletableFuture[0]))
149142
.thenCompose(ignore -> completeMultipartUpload(copyObjectRequest, uploadId, completedParts))
150-
.handle(handleExceptionOrResponse(copyObjectRequest, returnFuture, uploadId))
143+
.handle(genericMultipartHelper.handleExceptionOrResponse(copyObjectRequest, returnFuture,
144+
uploadId))
151145
.exceptionally(throwable -> {
152-
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
146+
genericMultipartHelper.handleException(returnFuture, () -> "Unexpected exception occurred",
147+
throwable);
153148
return null;
154149
});
155150
}
156151

157-
private BiFunction<CompleteMultipartUploadResponse, Throwable, Void> handleExceptionOrResponse(
158-
CopyObjectRequest copyObjectRequest,
159-
CompletableFuture<CopyObjectResponse> returnFuture,
160-
String uploadId) {
161-
162-
return (completeMultipartUploadResponse, throwable) -> {
163-
if (throwable != null) {
164-
cleanUpParts(copyObjectRequest, uploadId);
165-
handleException(returnFuture, () -> "Failed to send multipart copy requests.",
166-
throwable);
167-
} else {
168-
returnFuture.complete(CopyRequestConversionUtils.toCopyObjectResponse(
169-
completeMultipartUploadResponse));
170-
}
171-
172-
return null;
173-
};
174-
}
175-
176152
private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(
177153
CopyObjectRequest copyObjectRequest, String uploadId, AtomicReferenceArray<CompletedPart> completedParts) {
178154
log.debug(() -> String.format("Sending completeMultipartUploadRequest, uploadId: %s",
@@ -194,35 +170,6 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
194170
return s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest);
195171
}
196172

197-
private void cleanUpParts(CopyObjectRequest copyObjectRequest, String uploadId) {
198-
AbortMultipartUploadRequest abortMultipartUploadRequest =
199-
CopyRequestConversionUtils.toAbortMultipartUploadRequest(copyObjectRequest, uploadId);
200-
s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest)
201-
.exceptionally(throwable -> {
202-
log.warn(() -> String.format("Failed to abort previous multipart upload "
203-
+ "(id: %s)"
204-
+ ". You may need to call "
205-
+ "S3AsyncClient#abortMultiPartUpload to "
206-
+ "free all storage consumed by"
207-
+ " all parts. ",
208-
uploadId), throwable);
209-
return null;
210-
});
211-
}
212-
213-
private static void handleException(CompletableFuture<CopyObjectResponse> returnFuture,
214-
Supplier<String> message,
215-
Throwable throwable) {
216-
Throwable cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
217-
218-
if (cause instanceof Error) {
219-
returnFuture.completeExceptionally(cause);
220-
} else {
221-
SdkClientException exception = SdkClientException.create(message.get(), cause);
222-
returnFuture.completeExceptionally(exception);
223-
}
224-
}
225-
226173
private List<CompletableFuture<CompletedPart>> sendUploadPartCopyRequests(CopyObjectRequest copyObjectRequest,
227174
long contentLength,
228175
String uploadId,
@@ -265,23 +212,13 @@ private static CompletedPart convertUploadPartCopyResponse(AtomicReferenceArray<
265212
UploadPartCopyResponse uploadPartCopyResponse) {
266213
CopyPartResult copyPartResult = uploadPartCopyResponse.copyPartResult();
267214
CompletedPart completedPart =
268-
CopyRequestConversionUtils.toCompletedPart(copyPartResult,
269-
partNumber);
215+
RequestConversionUtils.toCompletedPart(copyPartResult,
216+
partNumber);
270217

271218
completedParts.set(partNumber - 1, completedPart);
272219
return completedPart;
273220
}
274221

275-
/**
276-
* Calculates the optimal part size of each part request if the copy operation is carried out as multipart copy.
277-
*/
278-
private long calculateOptimalPartSizeForCopy(long contentLengthOfSource) {
279-
double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS;
280-
281-
optimalPartSize = Math.ceil(optimalPartSize);
282-
return (long) Math.max(optimalPartSize, partSizeInBytes);
283-
}
284-
285222
private void copyInOneChunk(CopyObjectRequest copyObjectRequest,
286223
CompletableFuture<CopyObjectResponse> returnFuture) {
287224
CompletableFuture<CopyObjectResponse> copyObjectFuture =

0 commit comments

Comments
 (0)