Skip to content

Pause/Resume Upload for Transfer Manager with Java S3Client #4908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AmazonS3-a20b910.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Amazon S3",
"contributor": "",
"type": "feature",
"description": "Add support for pause/resume upload for TransferManager with Java-based S3 Client"
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ public static void setUpForAllIntegTests() throws Exception {
Log.initLoggingToStdout(Log.LogLevel.Warn);
System.setProperty("aws.crt.debugnative", "true");
s3 = s3ClientBuilder().build();
s3Async = s3AsyncClientBuilder().build();
s3Async = s3AsyncClientBuilder()
.multipartEnabled(true)
.build();
s3CrtAsync = S3CrtAsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.region(DEFAULT_REGION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

Expand All @@ -26,9 +25,12 @@
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.AsyncWaiter;
import software.amazon.awssdk.core.waiters.Waiter;
Expand Down Expand Up @@ -69,30 +71,41 @@ public static void cleanup() {
executorService.shutdown();
}

@Test
void pause_singlePart_shouldResume() {
private static Stream<Arguments> transferManagers() {
return Stream.of(
Arguments.of(tmJava, tmJava),
Arguments.of(tmCrt, tmCrt),
Arguments.of(tmCrt, tmJava),
Arguments.of(tmJava, tmCrt)
);
}

@ParameterizedTest
@MethodSource("transferManagers")
void pause_singlePart_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(smallFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pause_fileNotChanged_shouldResume() {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileNotChanged_shouldResume(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.addTransferListener(LoggingTransferListener.create())
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -103,33 +116,35 @@ void pause_fileNotChanged_shouldResume() {

verifyMultipartUploadIdExists(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pauseImmediately_resume_shouldStartFromBeginning() {
@ParameterizedTest
@MethodSource("transferManagers")
void pauseImmediately_resume_shouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = uploadTm.uploadFile(request);
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);

validateEmptyResumeToken(resumableFileUpload);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
}

@Test
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
@ParameterizedTest
@MethodSource("transferManagers")
void pause_fileChanged_resumeShouldStartFromBeginning(S3TransferManager uploadTm, S3TransferManager resumeTm) throws Exception {
UploadFileRequest request = UploadFileRequest.builder()
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.source(largeFile)
.build();
FileUpload fileUpload = tmCrt.uploadFile(request);
FileUpload fileUpload = uploadTm.uploadFile(request);
waitUntilMultipartUploadExists();
ResumableFileUpload resumableFileUpload = fileUpload.pause();
log.debug(() -> "Paused: " + resumableFileUpload);
Expand All @@ -139,13 +154,16 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
verifyMultipartUploadIdExists(resumableFileUpload);

byte[] originalBytes = Files.readAllBytes(largeFile.toPath());
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
Files.write(largeFile.toPath(), bytes);

FileUpload resumedUpload = tmCrt.resumeUploadFile(resumableFileUpload);
FileUpload resumedUpload = resumeTm.resumeUploadFile(resumableFileUpload);
resumedUpload.completionFuture().join();
verifyMultipartUploadIdNotExist(resumableFileUpload);
assertThat(resumedUpload.progress().snapshot().totalBytes()).hasValue(bytes.length);

Files.write(largeFile.toPath(), originalBytes);
}

private void verifyMultipartUploadIdExists(ResumableFileUpload resumableFileUpload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -31,7 +30,6 @@
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
Expand All @@ -51,6 +49,7 @@
@SdkInternalApi
class CrtS3TransferManager extends DelegatingS3TransferManager {
private static final Logger log = Logger.loggerFor(S3TransferManager.class);
private static final PauseResumeHelper PAUSE_RESUME_HELPER = new PauseResumeHelper();
private final S3AsyncClient s3AsyncClient;

CrtS3TransferManager(TransferManagerConfiguration transferConfiguration, S3AsyncClient s3AsyncClient,
Expand Down Expand Up @@ -99,67 +98,15 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
return new CrtFileUpload(returnFuture, progressUpdater.progress(), observable, uploadFileRequest);
}

private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
boolean noResumeToken) {
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
if (fileModified) {
log.debug(() -> String.format("The file (%s) has been modified since "
+ "the last pause. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the "
+ "beginning.",
uploadFileRequest.source(),
putObjectRequest.bucket(),
putObjectRequest.key()));
resumableFileUpload.multipartUploadId()
.ifPresent(id -> {
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
s3AsyncClient.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(putObjectRequest.bucket())
.key(putObjectRequest.key())
.uploadId(id)
.build())
.exceptionally(t -> {
log.warn(() -> String.format("Failed to abort previous multipart upload "
+ "(id: %s)"
+ ". You may need to call "
+ "S3AsyncClient#abortMultiPartUpload to "
+ "free all storage consumed by"
+ " all parts. ",
id), t);
return null;
});
});
}

if (noResumeToken) {
log.debug(() -> String.format("No resume token is found. " +
"The SDK will upload the requested object in bucket"
+ " (%s) with key (%s) from "
+ "the beginning.",
putObjectRequest.bucket(),
putObjectRequest.key()));
}


return uploadFile(uploadFileRequest);
}

@Override
public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
Validate.paramNotNull(resumableFileUpload, "resumableFileUpload");

boolean fileModified = !fileNotModified(resumableFileUpload.fileLength(),
resumableFileUpload.fileLastModified(),
resumableFileUpload.uploadFileRequest().source());

boolean noResumeToken = !hasResumeToken(resumableFileUpload);
boolean fileModified = PAUSE_RESUME_HELPER.fileModified(resumableFileUpload, s3AsyncClient);
boolean noResumeToken = !PAUSE_RESUME_HELPER.hasResumeToken(resumableFileUpload);

if (fileModified || noResumeToken) {
return uploadFromBeginning(resumableFileUpload, fileModified, noResumeToken);
return uploadFile(resumableFileUpload.uploadFileRequest());
}

return doResumeUpload(resumableFileUpload);
Expand Down Expand Up @@ -188,10 +135,6 @@ private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUploa
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
}

private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
}

private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,
Consumer<SdkHttpExecutionAttributes.Builder> builderMutation) {
SdkHttpExecutionAttributes modifiedAttributes =
Expand Down
Loading