Skip to content

Pause and resume for multipart download #5258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ public void exceptionOccurred(Throwable throwable) {
invokeSafely(() -> Files.deleteIfExists(path));
}
}
cf.completeExceptionally(throwable);
if (cf != null) {
cf.completeExceptionally(throwable);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

Expand Down Expand Up @@ -85,7 +86,7 @@ void pauseAndResume_ObjectNotChanged_shouldResumeDownload(S3TransferManager tm)
ResumableFileDownload resumableFileDownload = download.pause();
long bytesTransferred = resumableFileDownload.bytesTransferred();
log.debug(() -> "Paused: " + resumableFileDownload);
assertThat(resumableFileDownload.downloadFileRequest()).isEqualTo(request);
assertEqualsBySdkFields(resumableFileDownload.downloadFileRequest(), request);
assertThat(testDownloadListener.getObjectResponse).isNotNull();
assertThat(resumableFileDownload.s3ObjectLastModified()).hasValue(testDownloadListener.getObjectResponse.lastModified());
assertThat(bytesTransferred).isEqualTo(path.toFile().length());
Expand All @@ -98,6 +99,21 @@ void pauseAndResume_ObjectNotChanged_shouldResumeDownload(S3TransferManager tm)
verifyFileDownload(path, resumableFileDownload, OBJ_SIZE - bytesTransferred, tm);
}

private void assertEqualsBySdkFields(DownloadFileRequest actual, DownloadFileRequest expected) {
// Transfer manager adds an execution attribute to the GetObjectRequest, so both objects are different.
// Need to assert equality by sdk fields, which does not check execution attributes.
assertThat(actual.destination())
.withFailMessage("ResumableFileDownload destination not equal to the original DownloadFileRequest")
.isEqualTo(expected.destination());
assertThat(actual.transferListeners())
.withFailMessage("ResumableFileDownload transferListeners not equal to the original DownloadFileRequest")
.isEqualTo(expected.transferListeners());
assertTrue(actual.getObjectRequest().equalsBySdkFields(expected.getObjectRequest()),
() -> String.format("ResumableFileDownload GetObjectRequest not equal to the original DownloadFileRequest. "
+ "expected: %s. Actual:"
+ " %s", actual.getObjectRequest(), expected.getObjectRequest()));
}

@ParameterizedTest
@MethodSource("transferManagers")
void pauseAndResume_objectChanged_shouldStartFromBeginning(S3TransferManager tm) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;

import java.io.File;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
import software.amazon.awssdk.core.waiters.Waiter;
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.testutils.LogCaptor;
import software.amazon.awssdk.testutils.RandomTempFile;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
import software.amazon.awssdk.transfer.s3.model.FileDownload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;

public class S3TransferManagerMultipartDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase {
private static final String BUCKET = temporaryBucketName(S3TransferManagerMultipartDownloadPauseResumeIntegrationTest.class);
private static final String KEY = "key";

private static final long OBJ_SIZE = 32 * MB; // 32mib for 4 parts of 8 mib
private static File sourceFile;

@BeforeAll
public static void setup() throws Exception {
createBucket(BUCKET);
sourceFile = new RandomTempFile(OBJ_SIZE);

// use async client for multipart upload (with default part size)
s3Async.putObject(PutObjectRequest.builder()
.bucket(BUCKET)
.key(KEY)
.build(), sourceFile.toPath())
.join();
}

@AfterAll
public static void cleanup() {
deleteBucketAndAllContents(BUCKET);
sourceFile.delete();
}

@Test
void pauseAndResume_shouldResumeDownload() {
Path path = RandomTempFile.randomUncreatedFile().toPath();
DownloadFileRequest request = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.build();
FileDownload download = tmJava.downloadFile(request);

// wait until we receive enough byte to stop somewhere between part 2 and 3, 18 Mib should do it
waitUntilAmountTransferred(download, 18 * MB);
ResumableFileDownload resumableFileDownload = download.pause();
FileDownload resumed = tmJava.resumeDownloadFile(resumableFileDownload);
resumed.completionFuture().join();
assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile);
}

@Test
void pauseAndResume_whenAlreadyComplete_shouldHandleGracefully() {
Path path = RandomTempFile.randomUncreatedFile().toPath();
DownloadFileRequest request = DownloadFileRequest.builder()
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
.destination(path)
.build();
FileDownload download = tmJava.downloadFile(request);
download.completionFuture().join();
ResumableFileDownload resume = download.pause();
try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) {
FileDownload resumedDownload = tmJava.resumeDownloadFile(resume);
assertThat(resumedDownload.completionFuture()).isCompleted();
assertThat(path.toFile()).hasSameBinaryContentAs(sourceFile);

List<LogEvent> logEvents = logCaptor.loggedEvents();
assertThat(logEvents).noneMatch(
event -> event.getMessage().getFormattedMessage().contains("Sending downloadFileRequest"));
LogEvent firstLog = logEvents.get(0);
assertThat(firstLog.getMessage().getFormattedMessage())
.contains("The multipart download associated to the provided ResumableFileDownload is already completed, "
+ "nothing to resume");
}
}

private void waitUntilAmountTransferred(FileDownload download, long amountTransferred) {
Waiter<TransferProgressSnapshot> waiter =
Waiter.builder(TransferProgressSnapshot.class)
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(r -> r.transferredBytes() > amountTransferred))
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(5))
.maxAttempts(Integer.MAX_VALUE)
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(100))))
.build();
waiter.run(() -> download.progress().snapshot());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.services.s3.internal.multipart.MultipartDownloadUtils.multipartDownloadResumeContext;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.JAVA_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.MULTIPART_DOWNLOAD_RESUME_CONTEXT;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.multipart.S3MultipartExecutionAttribute.RESUME_TOKEN;
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
Expand All @@ -35,6 +38,7 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartDownloadResumeContext;
import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient;
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
import software.amazon.awssdk.services.s3.internal.resource.S3ArnConverter;
Expand All @@ -56,6 +60,8 @@
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileDownload;
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileUpload;
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress;
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
Expand Down Expand Up @@ -289,16 +295,31 @@ private CopyObjectRequest attachSdkAttribute(CopyObjectRequest copyObjectRequest
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
copyObjectRequest.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.applyMutation(builderMutation)
.build());
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.applyMutation(builderMutation)
.build());

return copyObjectRequest.toBuilder()
.overrideConfiguration(modifiedRequestOverrideConfig)
.build();
.overrideConfiguration(modifiedRequestOverrideConfig)
.build();
}

private GetObjectRequest attachSdkAttribute(GetObjectRequest request,
Consumer<AwsRequestOverrideConfiguration.Builder> builderMutation) {
AwsRequestOverrideConfiguration modifiedRequestOverrideConfig =
request.overrideConfiguration()
.map(o -> o.toBuilder().applyMutation(builderMutation).build())
.orElseGet(() -> AwsRequestOverrideConfiguration.builder()
.applyMutation(builderMutation)
.build());

return request.toBuilder()
.overrideConfiguration(modifiedRequestOverrideConfig)
.build();
}


@Override
public final DirectoryUpload uploadDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Validate.paramNotNull(uploadDirectoryRequest, "uploadDirectoryRequest");
Expand All @@ -325,7 +346,7 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
progressUpdater.transferInitiated();
responseTransformer = isS3ClientMultipartEnabled()
? progressUpdater.wrapResponseTransformerForMultipartDownload(
responseTransformer, downloadRequest.getObjectRequest())
responseTransformer, downloadRequest.getObjectRequest())
: progressUpdater.wrapResponseTransformer(responseTransformer);
progressUpdater.registerCompletion(returnFuture);

Expand All @@ -352,14 +373,21 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
public final FileDownload downloadFile(DownloadFileRequest downloadRequest) {
Validate.paramNotNull(downloadRequest, "downloadFileRequest");

GetObjectRequest getObjectRequestWithAttributes = attachSdkAttribute(
downloadRequest.getObjectRequest(),
b -> b.putExecutionAttribute(MULTIPART_DOWNLOAD_RESUME_CONTEXT, new MultipartDownloadResumeContext()));
DownloadFileRequest downloadFileRequestWithAttributes =
downloadRequest.copy(downloadFileRequest -> downloadFileRequest.getObjectRequest(getObjectRequestWithAttributes));

AsyncResponseTransformer<GetObjectResponse, GetObjectResponse> responseTransformer =
AsyncResponseTransformer.toFile(downloadRequest.destination(),
AsyncResponseTransformer.toFile(downloadFileRequestWithAttributes.destination(),
FileTransformerConfiguration.defaultCreateOrReplaceExisting());

CompletableFuture<CompletedFileDownload> returnFuture = new CompletableFuture<>();
TransferProgressUpdater progressUpdater = doDownloadFile(downloadRequest, responseTransformer, returnFuture);
TransferProgressUpdater progressUpdater = doDownloadFile(
downloadFileRequestWithAttributes, responseTransformer, returnFuture);

return new DefaultFileDownload(returnFuture, progressUpdater.progress(), () -> downloadRequest, null);
return new DefaultFileDownload(returnFuture, progressUpdater.progress(), () -> downloadFileRequestWithAttributes, null);
}

private TransferProgressUpdater doDownloadFile(
Expand All @@ -371,7 +399,7 @@ private TransferProgressUpdater doDownloadFile(
progressUpdater.transferInitiated();
responseTransformer = isS3ClientMultipartEnabled()
? progressUpdater.wrapResponseTransformerForMultipartDownload(
responseTransformer, downloadRequest.getObjectRequest())
responseTransformer, downloadRequest.getObjectRequest())
: progressUpdater.wrapResponseTransformer(responseTransformer);
progressUpdater.registerCompletion(returnFuture);

Expand All @@ -396,6 +424,16 @@ private TransferProgressUpdater doDownloadFile(
@Override
public final FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownload) {
Validate.paramNotNull(resumableFileDownload, "resumableFileDownload");

// check if the multipart-download was already completed and handle it gracefully.
Optional<MultipartDownloadResumeContext> optCtx =
multipartDownloadResumeContext(resumableFileDownload.downloadFileRequest().getObjectRequest());
if (optCtx.map(MultipartDownloadResumeContext::isComplete).orElse(false)) {
log.debug(() -> "The multipart download associated to the provided ResumableFileDownload is already completed, "
+ "nothing to resume");
return completedDownload(resumableFileDownload, optCtx.get());
}

CompletableFuture<CompletedFileDownload> returnFuture = new CompletableFuture<>();
DownloadFileRequest originalDownloadRequest = resumableFileDownload.downloadFileRequest();
GetObjectRequest getObjectRequest = originalDownloadRequest.getObjectRequest();
Expand Down Expand Up @@ -432,6 +470,20 @@ public final FileDownload resumeDownloadFile(ResumableFileDownload resumableFile
resumableFileDownload);
}

private FileDownload completedDownload(ResumableFileDownload resumableFileDownload, MultipartDownloadResumeContext ctx) {
CompletedFileDownload completedFileDownload = CompletedFileDownload.builder().response(ctx.response()).build();
DefaultTransferProgressSnapshot completedProgressSnapshot =
DefaultTransferProgressSnapshot.builder()
.sdkResponse(ctx.response())
.totalBytes(ctx.bytesToLastCompletedParts())
.transferredBytes(resumableFileDownload.bytesTransferred())
.build();
return new DefaultFileDownload(CompletableFuture.completedFuture(completedFileDownload),
new DefaultTransferProgress(completedProgressSnapshot),
resumableFileDownload::downloadFileRequest,
resumableFileDownload);
}

private DownloadFileRequest newOrOriginalRequestForPause(CompletableFuture<DownloadFileRequest> newDownloadFuture,
DownloadFileRequest originalDownloadRequest) {
try {
Expand Down
Loading
Loading