Skip to content

Commit e970a6f

Browse files
authored
Pause/Resume Upload for Transfer Manager with Java S3Client (#4908) (#4937)
* Pause/Resume Upload for Transfer Manager with Java S3Client * Fix checkstyle and equalsVerifier * temporarily disable failing test * Address comments * ifPresent check for PauseObservable * Address comments * Wrap subscriber with PausibleUpload * add changelog * integ test upload resume with different TMs * Rename to PausableUpload * Add unit tests * Refactor KnownLengthUploadHelper * Add unit tests * Move PauseObservable and S3ResumeToken out of internal * Refactor UploadWithKnownContentLengthHelper * Address comments and update tests * Fix import order * Extract Subscriber and MpuRequestContext to separate classes * Update Subscriber and add tests * Create separate tmJava with multipartClient in integ test * Address comments * Remove TestInternalApis and refacor tests * Address comment
1 parent 189bbcf commit e970a6f

29 files changed

+1792
-349
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Amazon S3",
3+
"contributor": "",
4+
"type": "feature",
5+
"description": "Add support for pause/resume upload for TransferManager with Java-based S3Client that has multipart enabled"
6+
}

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3IntegrationTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public static void setUpForAllIntegTests() throws Exception {
6868
Log.initLoggingToStdout(Log.LogLevel.Warn);
6969
System.setProperty("aws.crt.debugnative", "true");
7070
s3 = s3ClientBuilder().build();
71+
// TODO - enable multipart once TransferListener fixed for MultipartClient
7172
s3Async = s3AsyncClientBuilder().build();
7273
s3CrtAsync = S3CrtAsyncClient.builder()
7374
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.transfer.s3;
1717

1818
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19-
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
2019
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
2120
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
2221

@@ -26,13 +25,17 @@
2625
import java.time.Duration;
2726
import java.util.concurrent.Executors;
2827
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.stream.Stream;
2929
import org.junit.jupiter.api.AfterAll;
3030
import org.junit.jupiter.api.BeforeAll;
31-
import org.junit.jupiter.api.Test;
31+
import org.junit.jupiter.params.ParameterizedTest;
32+
import org.junit.jupiter.params.provider.Arguments;
33+
import org.junit.jupiter.params.provider.MethodSource;
3234
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
3335
import software.amazon.awssdk.core.waiters.AsyncWaiter;
3436
import software.amazon.awssdk.core.waiters.Waiter;
3537
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
38+
import software.amazon.awssdk.services.s3.S3AsyncClient;
3639
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
3740
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
3841
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
@@ -48,17 +51,25 @@ public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3Integra
4851
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
4952
private static final String KEY = "key";
5053
// 24 * MB is chosen to make sure we have data written in the file already upon pausing.
51-
private static final long OBJ_SIZE = 24 * MB;
54+
private static final long LARGE_OBJ_SIZE = 24 * MB;
55+
private static final long SMALL_OBJ_SIZE = 2 * MB;
5256
private static File largeFile;
5357
private static File smallFile;
5458
private static ScheduledExecutorService executorService;
5559

60+
// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
61+
protected static S3TransferManager tmJavaMpu;
62+
5663
@BeforeAll
5764
public static void setup() throws Exception {
5865
createBucket(BUCKET);
59-
largeFile = new RandomTempFile(OBJ_SIZE);
60-
smallFile = new RandomTempFile(2 * MB);
66+
largeFile = new RandomTempFile(LARGE_OBJ_SIZE);
67+
smallFile = new RandomTempFile(SMALL_OBJ_SIZE);
6168
executorService = Executors.newScheduledThreadPool(3);
69+
70+
// TODO - switch to tmJava from TestBase once TransferListener fixed for MultipartClient
71+
S3AsyncClient s3AsyncMpu = s3AsyncClientBuilder().multipartEnabled(true).build();
72+
tmJavaMpu = S3TransferManager.builder().s3Client(s3AsyncMpu).build();
6273
}
6374

6475
@AfterAll
@@ -69,30 +80,42 @@ public static void cleanup() {
6980
executorService.shutdown();
7081
}
7182

72-
@Test
73-
void pause_singlePart_shouldResume() {
83+
private static Stream<Arguments> transferManagers() {
84+
return Stream.of(
85+
Arguments.of(tmJavaMpu, tmJavaMpu),
86+
Arguments.of(tmCrt, tmCrt),
87+
Arguments.of(tmCrt, tmJavaMpu),
88+
Arguments.of(tmJavaMpu, tmCrt)
89+
);
90+
}
91+
92+
@ParameterizedTest
93+
@MethodSource("transferManagers")
94+
void pause_singlePart_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) {
7495
UploadFileRequest request = UploadFileRequest.builder()
7596
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
7697
.source(smallFile)
7798
.build();
78-
FileUpload fileUpload = tmCrt.uploadFile(request);
99+
FileUpload fileUpload = uploadTm.uploadFile(request);
79100
ResumableFileUpload resumableFileUpload = fileUpload.pause();
80101
log.debug(() -> "Paused: " + resumableFileUpload);
81102

82103
validateEmptyResumeToken(resumableFileUpload);
83104

84-
FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
105+
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
85106
resumedUpload.completionFuture().join();
107+
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(SMALL_OBJ_SIZE);
86108
}
87109

88-
@Test
89-
void pause_fileNotChanged_shouldResume() {
110+
@ParameterizedTest
111+
@MethodSource("transferManagers")
112+
void pause_fileNotChanged_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
90113
UploadFileRequest request = UploadFileRequest.builder()
91114
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
92115
.addTransferListener(LoggingTransferListener.create())
93116
.source(largeFile)
94117
.build();
95-
FileUpload fileUpload = tmCrt.uploadFile(request);
118+
FileUpload fileUpload = uploadTm.uploadFile(request);
96119
waitUntilMultipartUploadExists();
97120
ResumableFileUpload resumableFileUpload = fileUpload.pause();
98121
log.debug(() -> "Paused: " + resumableFileUpload);
@@ -103,33 +126,37 @@ void pause_fileNotChanged_shouldResume() {
103126

104127
verifyMultipartUploadIdExists(resumableFileUpload);
105128

106-
FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
129+
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
107130
resumedUpload.completionFuture().join();
131+
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
108132
}
109133

110-
@Test
111-
void pauseImmediately_resume_shouldStartFromBeginning() {
134+
@ParameterizedTest
135+
@MethodSource("transferManagers")
136+
void pauseImmediately_resume_shouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) {
112137
UploadFileRequest request = UploadFileRequest.builder()
113-
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
114-
.source(largeFile)
115-
.build();
116-
FileUpload fileUpload = tmCrt.uploadFile(request);
138+
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
139+
.source(largeFile)
140+
.build();
141+
FileUpload fileUpload = uploadTm.uploadFile(request);
117142
ResumableFileUpload resumableFileUpload = fileUpload.pause();
118143
log.debug(() -> "Paused: " + resumableFileUpload);
119144

120145
validateEmptyResumeToken(resumableFileUpload);
121146

122-
FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
147+
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
123148
resumedUpload.completionFuture().join();
149+
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(LARGE_OBJ_SIZE);
124150
}
125151

126-
@Test
127-
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
152+
@ParameterizedTest
153+
@MethodSource("transferManagers")
154+
void pause_fileChanged_resumeShouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
128155
UploadFileRequest request = UploadFileRequest.builder()
129156
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
130157
.source(largeFile)
131158
.build();
132-
FileUpload fileUpload = tmCrt.uploadFile(request);
159+
FileUpload fileUpload = uploadTm.uploadFile(request);
133160
waitUntilMultipartUploadExists();
134161
ResumableFileUpload resumableFileUpload = fileUpload.pause();
135162
log.debug(() -> "Paused: " + resumableFileUpload);
@@ -139,13 +166,18 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
139166
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
140167
verifyMultipartUploadIdExists(resumableFileUpload);
141168

142-
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
143-
Files.write(largeFile.toPath(), bytes);
144-
145-
FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
146-
resumedUpload.completionFuture().join();
147-
verifyMultipartUploadIdNotExist(resumableFileUpload);
148-
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
169+
byte[] originalBytes = Files.readAllBytes(largeFile.toPath());
170+
try {
171+
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
172+
Files.write(largeFile.toPath(), bytes);
173+
174+
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
175+
resumedUpload.completionFuture().join();
176+
verifyMultipartUploadIdNotExist(resumableFileUpload);
177+
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);
178+
} finally {
179+
Files.write(largeFile.toPath(), originalBytes);
180+
}
149181
}
150182

151183
private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUpload) {

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java

Lines changed: 4 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
2121
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
2222
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
23-
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;
2423

2524
import java.util.concurrent.CompletableFuture;
2625
import java.util.function.Consumer;
@@ -31,7 +30,6 @@
3130
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
3231
import software.amazon.awssdk.services.s3.S3AsyncClient;
3332
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
34-
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
3533
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
3634
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3735
import software.amazon.awssdk.transfer.s3.S3TransferManager;
@@ -51,6 +49,7 @@
5149
@SdkInternalApi
5250
class CrtS3TransferManager extends DelegatingS3TransferManager {
5351
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
52+
private static final PauseResumeHelper PAUSE_RESUME_HELPER = new PauseResumeHelper();
5453
private final S3AsyncClient s3AsyncClient;
5554

5655
CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
@@ -99,67 +98,15 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
9998
return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
10099
}
101100

102-
private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
103-
boolean noResumeToken) {
104-
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
105-
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
106-
if (fileModified) {
107-
log.debug(() -> String.format("The file (%s) has been modified since "
108-
+ "the last pause. " +
109-
"The SDK will upload the requested object in bucket"
110-
+ " (%s) with key (%s) from "
111-
+ "the "
112-
+ "beginning.",
113-
uploadFileRequest.source(),
114-
putObjectRequest.bucket(),
115-
putObjectRequest.key()));
116-
resumableFileUpload.multipartUploadId()
117-
.ifPresent(id -> {
118-
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
119-
s3AsyncClient.abortMultipartUpload(
120-
AbortMultipartUploadRequest.builder()
121-
.bucket(putObjectRequest.bucket())
122-
.key(putObjectRequest.key())
123-
.uploadId(id)
124-
.build())
125-
.exceptionally(t -> {
126-
log.warn(() -> String.format("Failed to abort previous multipart upload "
127-
+ "(id: %s)"
128-
+ ". You may need to call "
129-
+ "S3AsyncClient#abortMultiPartUpload to "
130-
+ "free all storage consumed by"
131-
+ " all parts. ",
132-
id), t);
133-
return null;
134-
});
135-
});
136-
}
137-
138-
if (noResumeToken) {
139-
log.debug(() -> String.format("No resume token is found. " +
140-
"The SDK will upload the requested object in bucket"
141-
+ " (%s) with key (%s) from "
142-
+ "the beginning.",
143-
putObjectRequest.bucket(),
144-
putObjectRequest.key()));
145-
}
146-
147-
148-
return uploadFile(uploadFileRequest);
149-
}
150-
151101
@Override
152102
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
153103
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");
154104

155-
boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
156-
resumableFileUpload.fileLastModified(),
157-
resumableFileUpload.uploadFileRequest().source());
158-
159-
boolean noResumeToken = !hasResumeToken(resumableFileUpload);
105+
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
106+
boolean noResumeToken = !PAUSE_RESUME_HELPER.hasResumeToken(resumableFileUpload);
160107

161108
if (fileModified || noResumeToken) {
162-
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
109+
return uploadFile(resumableFileUpload.uploadFileRequest());
163110
}
164111

165112
return doResumeUpload(resumableFileUpload);
@@ -188,10 +135,6 @@ private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUploa
188135
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
189136
}
190137

191-
private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
192-
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
193-
}
194-
195138
private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
196139
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
197140
SdkHttpExecutionAttributes modifiedAttributes =

0 commit comments

Comments
 (0)