Skip to content

Commit c1d55ac

Browse files
authored
[TM DownloadFile Pause and Resume] Part 2: Implement pause for downloadFile operation (#3094)
* Part 1: Implement pause for downloadFile operation * Address feedback * Refactor the logic * Address feedback * Fix merging issue
1 parent eade04e commit c1d55ac

17 files changed

+661
-34
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3;
17+
18+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
19+
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;
20+
21+
import java.io.File;
22+
import java.nio.file.Path;
23+
import java.util.Optional;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Test;
29+
import software.amazon.awssdk.core.SdkResponse;
30+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
31+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
32+
import software.amazon.awssdk.testutils.RandomTempFile;
33+
import software.amazon.awssdk.transfer.s3.progress.TransferListener;
34+
35+
public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase {
36+
private static final String BUCKET = temporaryBucketName(S3TransferManagerDownloadPauseResumeIntegrationTest.class);
37+
private static final String KEY = "key";
38+
private static final int OBJ_SIZE = 16 * 1024 * 1024;
39+
private static S3TransferManager tm;
40+
private static File file;
41+
42+
@BeforeAll
43+
public static void setup() throws Exception {
44+
S3IntegrationTestBase.setUp();
45+
createBucket(BUCKET);
46+
file = new RandomTempFile(OBJ_SIZE);
47+
s3.putObject(PutObjectRequest.builder()
48+
.bucket(BUCKET)
49+
.key(KEY)
50+
.build(), file.toPath());
51+
tm = S3TransferManager.builder()
52+
.s3ClientConfiguration(b -> b.region(DEFAULT_REGION)
53+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN))
54+
.build();
55+
}
56+
57+
@AfterAll
58+
public static void cleanup() {
59+
deleteBucketAndAllContents(BUCKET);
60+
tm.close();
61+
S3IntegrationTestBase.cleanUp();
62+
}
63+
64+
@Test
65+
void downloadToFile_pause_shouldReturnResumableDownload() throws InterruptedException {
66+
CountDownLatch countDownLatch = new CountDownLatch(1);
67+
Path path = RandomTempFile.randomUncreatedFile().toPath();
68+
TestDownloadListener testDownloadListener = new TestDownloadListener(countDownLatch);
69+
DownloadFileRequest request = DownloadFileRequest.builder()
70+
.getObjectRequest(b -> b.bucket(BUCKET).key(KEY))
71+
.destination(path)
72+
.overrideConfiguration(b -> b
73+
.addListener(testDownloadListener))
74+
.build();
75+
FileDownload download =
76+
tm.downloadFile(request);
77+
boolean count = countDownLatch.await(10, TimeUnit.SECONDS);
78+
if (!count) {
79+
throw new AssertionError("No data has been transferred within 5 seconds");
80+
}
81+
ResumableFileDownload pause = download.pause();
82+
assertThat(pause.downloadFileRequest()).isEqualTo(request);
83+
assertThat(testDownloadListener.getObjectResponse).isNotNull();
84+
assertThat(pause.lastModified()).isEqualTo(testDownloadListener.getObjectResponse.lastModified());
85+
assertThat(pause.bytesTransferred()).isEqualTo(path.toFile().length());
86+
assertThat(pause.transferSizeInBytes()).hasValue(file.length());
87+
assertThat(download.completionFuture()).isCancelled();
88+
}
89+
90+
private static final class TestDownloadListener implements TransferListener {
91+
private final CountDownLatch countDownLatch;
92+
private GetObjectResponse getObjectResponse;
93+
94+
private TestDownloadListener(CountDownLatch countDownLatch) {
95+
this.countDownLatch = countDownLatch;
96+
}
97+
98+
@Override
99+
public void bytesTransferred(Context.BytesTransferred context) {
100+
Optional<SdkResponse> sdkResponse = context.progressSnapshot().sdkResponse();
101+
if (sdkResponse.isPresent() && sdkResponse.get() instanceof GetObjectResponse) {
102+
getObjectResponse = (GetObjectResponse) sdkResponse.get();
103+
}
104+
countDownLatch.countDown();
105+
}
106+
}
107+
108+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/FileDownload.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,24 @@
1818
import java.util.concurrent.CompletableFuture;
1919
import software.amazon.awssdk.annotations.SdkPreviewApi;
2020
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.annotations.ThreadSafe;
2122

2223
/**
2324
* A download transfer of a single object from S3.
2425
*/
2526
@SdkPublicApi
2627
@SdkPreviewApi
28+
@ThreadSafe
2729
public interface FileDownload extends ObjectTransfer {
28-
30+
31+
/**
32+
* Pause the current download operation and returns the information that can
33+
* be used to resume the download at a later time.
34+
*
35+
* @return {@link ResumableFileDownload} that can be used to resume the download
36+
*/
37+
ResumableFileDownload pause();
38+
2939
@Override
3040
CompletableFuture<CompletedFileDownload> completionFuture();
3141
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3;
17+
18+
import java.time.Instant;
19+
import java.util.Objects;
20+
import java.util.Optional;
21+
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.utils.ToString;
23+
import software.amazon.awssdk.utils.Validate;
24+
import software.amazon.awssdk.utils.builder.CopyableBuilder;
25+
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
26+
27+
/**
28+
* An opaque token that holds the state and can be used to resume a
29+
* paused download operation.
30+
*
31+
* TODO: 1. should we just store GetObjectResponse?
32+
* 2. consider providing a way to serialize and deserialize the token
33+
* 3. Do we need to store file checksum?
34+
*
35+
* @see S3TransferManager#downloadFile(DownloadFileRequest)
36+
*/
37+
@SdkPublicApi
38+
public final class ResumableFileDownload implements ResumableTransfer,
39+
ToCopyableBuilder<ResumableFileDownload.Builder, ResumableFileDownload> {
40+
private final DownloadFileRequest downloadFileRequest;
41+
private final long bytesTransferred;
42+
private final Instant lastModified;
43+
private final Long transferSizeInBytes;
44+
45+
private ResumableFileDownload(DefaultBuilder builder) {
46+
this.downloadFileRequest = Validate.paramNotNull(builder.downloadFileRequest, "downloadFileRequest");
47+
this.bytesTransferred = builder.bytesTransferred == null ? 0 : builder.bytesTransferred;
48+
this.lastModified = builder.lastModified;
49+
this.transferSizeInBytes = builder.transferSizeInBytes;
50+
}
51+
52+
@Override
53+
public boolean equals(Object o) {
54+
if (this == o) {
55+
return true;
56+
}
57+
if (o == null || getClass() != o.getClass()) {
58+
return false;
59+
}
60+
61+
ResumableFileDownload that = (ResumableFileDownload) o;
62+
63+
if (bytesTransferred != that.bytesTransferred) {
64+
return false;
65+
}
66+
if (!downloadFileRequest.equals(that.downloadFileRequest)) {
67+
return false;
68+
}
69+
if (!Objects.equals(lastModified, that.lastModified)) {
70+
return false;
71+
}
72+
return Objects.equals(transferSizeInBytes, that.transferSizeInBytes);
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
int result = downloadFileRequest.hashCode();
78+
result = 31 * result + (int) (bytesTransferred ^ (bytesTransferred >>> 32));
79+
result = 31 * result + (lastModified != null ? lastModified.hashCode() : 0);
80+
result = 31 * result + (transferSizeInBytes != null ? transferSizeInBytes.hashCode() : 0);
81+
return result;
82+
}
83+
84+
@Override
85+
public String toString() {
86+
return ToString.builder("ResumableFileDownload")
87+
.add("downloadFileRequest", downloadFileRequest)
88+
.add("bytesTransferred", bytesTransferred)
89+
.add("lastModified", lastModified)
90+
.add("transferSizeInBytes", transferSizeInBytes)
91+
.build();
92+
}
93+
94+
public static Builder builder() {
95+
return new DefaultBuilder();
96+
}
97+
98+
/**
99+
* @return the {@link DownloadFileRequest} to resume
100+
*/
101+
public DownloadFileRequest downloadFileRequest() {
102+
return downloadFileRequest;
103+
}
104+
105+
/**
106+
* Retrieve the number of bytes that have been transferred.
107+
* @return the number of bytes
108+
*/
109+
public long bytesTransferred() {
110+
return bytesTransferred;
111+
}
112+
113+
/**
114+
* Last modified time on Amazon S3 for this object.
115+
*/
116+
public Instant lastModified() {
117+
return lastModified;
118+
}
119+
120+
/**
121+
* The total size of the transfer in bytes, or {@link Optional#empty()} if unknown
122+
*
123+
* @return the optional total size of the transfer.
124+
*/
125+
public Optional<Long> transferSizeInBytes() {
126+
return Optional.ofNullable(transferSizeInBytes);
127+
}
128+
129+
@Override
130+
public Builder toBuilder() {
131+
return new DefaultBuilder(this);
132+
}
133+
134+
public interface Builder extends CopyableBuilder<Builder, ResumableFileDownload> {
135+
136+
/**
137+
* Sets the download file request
138+
*
139+
* @param downloadFileRequest the download file request
140+
* @return a reference to this object so that method calls can be chained together.
141+
*/
142+
Builder downloadFileRequest(DownloadFileRequest downloadFileRequest);
143+
144+
/**
145+
* Sets the number of bytes transferred
146+
*
147+
* @param bytesTransferred the number of bytes transferred
148+
* @return a reference to this object so that method calls can be chained together.
149+
*/
150+
Builder bytesTransferred(Long bytesTransferred);
151+
152+
/**
153+
* Sets the total transfer size in bytes
154+
* @param transferSizeInBytes the transfer size in bytes
155+
* @return a reference to this object so that method calls can be chained together.
156+
*/
157+
Builder transferSizeInBytes(Long transferSizeInBytes);
158+
159+
/**
160+
* Sets the last modified time of the object
161+
*
162+
* @param lastModified the last modified time of the object
163+
* @return a reference to this object so that method calls can be chained together.
164+
*/
165+
Builder lastModified(Instant lastModified);
166+
}
167+
168+
private static final class DefaultBuilder implements Builder {
169+
private DownloadFileRequest downloadFileRequest;
170+
private Long bytesTransferred;
171+
private Instant lastModified;
172+
private Long transferSizeInBytes;
173+
174+
private DefaultBuilder() {
175+
176+
}
177+
178+
private DefaultBuilder(ResumableFileDownload persistableFileDownload) {
179+
this.downloadFileRequest = persistableFileDownload.downloadFileRequest;
180+
this.bytesTransferred = persistableFileDownload.bytesTransferred;
181+
this.lastModified = persistableFileDownload.lastModified;
182+
}
183+
184+
@Override
185+
public Builder downloadFileRequest(DownloadFileRequest downloadFileRequest) {
186+
this.downloadFileRequest = downloadFileRequest;
187+
return this;
188+
}
189+
190+
@Override
191+
public Builder bytesTransferred(Long bytesTransferred) {
192+
this.bytesTransferred = bytesTransferred;
193+
return this;
194+
}
195+
196+
@Override
197+
public Builder transferSizeInBytes(Long transferSizeInBytes) {
198+
this.transferSizeInBytes = transferSizeInBytes;
199+
return this;
200+
}
201+
202+
@Override
203+
public Builder lastModified(Instant lastModified) {
204+
this.lastModified = lastModified;
205+
return this;
206+
}
207+
208+
@Override
209+
public ResumableFileDownload build() {
210+
return new ResumableFileDownload(this);
211+
}
212+
}
213+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3;
17+
18+
import software.amazon.awssdk.annotations.SdkPreviewApi;
19+
import software.amazon.awssdk.annotations.SdkPublicApi;
20+
21+
/**
22+
* Contains the information of a pausible upload or download; such
23+
* information can be used to resume the upload or download later on
24+
*
25+
* @see FileDownload#pause()
26+
*/
27+
@SdkPublicApi
28+
@SdkPreviewApi
29+
public interface ResumableTransfer {
30+
}

0 commit comments

Comments
 (0)