17
17
18
18
import static org .assertj .core .api .AssertionsForClassTypes .assertThat ;
19
19
import static software .amazon .awssdk .testutils .service .S3BucketUtils .temporaryBucketName ;
20
+ import static software .amazon .awssdk .transfer .s3 .SizeConstant .MB ;
20
21
21
22
import java .io .File ;
23
+ import java .io .IOException ;
24
+ import java .nio .charset .StandardCharsets ;
25
+ import java .nio .file .Files ;
22
26
import java .nio .file .Path ;
27
+ import java .time .Duration ;
23
28
import java .util .Optional ;
24
- import java .util .concurrent .CountDownLatch ;
25
- import java .util .concurrent .TimeUnit ;
29
+ import org .apache .commons .lang3 .RandomStringUtils ;
26
30
import org .junit .jupiter .api .AfterAll ;
27
31
import org .junit .jupiter .api .BeforeAll ;
28
32
import org .junit .jupiter .api .Test ;
29
33
import software .amazon .awssdk .core .SdkResponse ;
34
+ import software .amazon .awssdk .core .retry .backoff .FixedDelayBackoffStrategy ;
35
+ import software .amazon .awssdk .core .sync .RequestBody ;
36
+ import software .amazon .awssdk .core .waiters .Waiter ;
37
+ import software .amazon .awssdk .core .waiters .WaiterAcceptor ;
30
38
import software .amazon .awssdk .services .s3 .model .GetObjectResponse ;
31
39
import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
32
40
import software .amazon .awssdk .testutils .RandomTempFile ;
33
41
import software .amazon .awssdk .transfer .s3 .progress .TransferListener ;
42
+ import software .amazon .awssdk .transfer .s3 .progress .TransferProgressSnapshot ;
43
+ import software .amazon .awssdk .utils .Logger ;
34
44
35
45
public class S3TransferManagerDownloadPauseResumeIntegrationTest extends S3IntegrationTestBase {
46
+ private static final Logger log = Logger .loggerFor (S3TransferManagerDownloadPauseResumeIntegrationTest .class );
36
47
private static final String BUCKET = temporaryBucketName (S3TransferManagerDownloadPauseResumeIntegrationTest .class );
37
48
private static final String KEY = "key" ;
38
- private static final int OBJ_SIZE = 16 * 1024 * 1024 ;
49
+ // 24 * MB is chosen to make sure we have data written in the file already upon pausing.
50
+ private static final long OBJ_SIZE = 24 * MB ;
39
51
private static S3TransferManager tm ;
40
- private static File file ;
52
+ private static File sourceFile ;
41
53
42
54
@ BeforeAll
43
55
public static void setup () throws Exception {
44
56
S3IntegrationTestBase .setUp ();
45
57
createBucket (BUCKET );
46
- file = new RandomTempFile (OBJ_SIZE );
58
+ sourceFile = new RandomTempFile (OBJ_SIZE );
47
59
s3 .putObject (PutObjectRequest .builder ()
48
60
.bucket (BUCKET )
49
61
.key (KEY )
50
- .build (), file .toPath ());
62
+ .build (), sourceFile .toPath ());
51
63
tm = S3TransferManager .builder ()
52
64
.s3ClientConfiguration (b -> b .region (DEFAULT_REGION )
53
65
.credentialsProvider (CREDENTIALS_PROVIDER_CHAIN ))
@@ -58,50 +70,111 @@ public static void setup() throws Exception {
58
70
public static void cleanup () {
59
71
deleteBucketAndAllContents (BUCKET );
60
72
tm .close ();
73
+ sourceFile .delete ();
61
74
S3IntegrationTestBase .cleanUp ();
62
75
}
63
76
64
77
@ Test
65
- void downloadToFile_pause_shouldReturnResumableDownload () throws InterruptedException {
66
- CountDownLatch countDownLatch = new CountDownLatch (1 );
78
+ void pauseAndResume_ObjectNotChanged_shouldResumeDownload () {
67
79
Path path = RandomTempFile .randomUncreatedFile ().toPath ();
68
- TestDownloadListener testDownloadListener = new TestDownloadListener (countDownLatch );
80
+ TestDownloadListener testDownloadListener = new TestDownloadListener ();
69
81
DownloadFileRequest request = DownloadFileRequest .builder ()
70
82
.getObjectRequest (b -> b .bucket (BUCKET ).key (KEY ))
71
83
.destination (path )
72
84
.overrideConfiguration (b -> b
73
85
.addListener (testDownloadListener ))
74
86
.build ();
75
- FileDownload download =
76
- tm .downloadFile (request );
77
- boolean count = countDownLatch .await (10 , TimeUnit .SECONDS );
78
- if (!count ) {
79
- throw new AssertionError ("No data has been transferred within 5 seconds" );
80
- }
81
- ResumableFileDownload pause = download .pause ();
82
- assertThat (pause .downloadFileRequest ()).isEqualTo (request );
87
+ FileDownload download = tm .downloadFile (request );
88
+ waitUntilFirstByteBufferDelivered (download );
89
+
90
+ ResumableFileDownload resumableFileDownload = download .pause ();
91
+ long bytesTransferred = resumableFileDownload .bytesTransferred ();
92
+ log .debug (() -> "Paused: " + resumableFileDownload );
93
+ assertThat (resumableFileDownload .downloadFileRequest ()).isEqualTo (request );
83
94
assertThat (testDownloadListener .getObjectResponse ).isNotNull ();
84
- assertThat (pause .lastModified ()).isEqualTo (testDownloadListener .getObjectResponse .lastModified ());
85
- assertThat (pause .bytesTransferred ()).isEqualTo (path .toFile ().length ());
86
- assertThat (pause .transferSizeInBytes ()).hasValue (file .length ());
95
+ assertThat (resumableFileDownload .s3ObjectLastModified ()).hasValue (testDownloadListener .getObjectResponse .lastModified ());
96
+ assertThat (bytesTransferred ).isEqualTo (path .toFile ().length ());
97
+ assertThat (resumableFileDownload .totalSizeInBytes ()).hasValue (sourceFile .length ());
98
+
99
+ assertThat (bytesTransferred ).isLessThan (sourceFile .length ());
87
100
assertThat (download .completionFuture ()).isCancelled ();
101
+
102
+ log .debug (() -> "Resuming download " );
103
+ verifyFileDownload (path , resumableFileDownload , OBJ_SIZE - bytesTransferred );
104
+ }
105
+
106
+ @ Test
107
+ void pauseAndResume_objectChanged_shouldStartFromBeginning () {
108
+ Path path = RandomTempFile .randomUncreatedFile ().toPath ();
109
+ DownloadFileRequest request = DownloadFileRequest .builder ()
110
+ .getObjectRequest (b -> b .bucket (BUCKET ).key (KEY ))
111
+ .destination (path )
112
+ .build ();
113
+ FileDownload download = tm .downloadFile (request );
114
+ waitUntilFirstByteBufferDelivered (download );
115
+
116
+ ResumableFileDownload resumableFileDownload = download .pause ();
117
+ log .debug (() -> "Paused: " + resumableFileDownload );
118
+ String newObject = RandomStringUtils .randomAlphanumeric (1000 );
119
+
120
+ // Re-upload the S3 object
121
+ s3 .putObject (PutObjectRequest .builder ()
122
+ .bucket (BUCKET )
123
+ .key (KEY )
124
+ .build (), RequestBody .fromString (newObject ));
125
+
126
+ log .debug (() -> "Resuming download " );
127
+ FileDownload resumedFileDownload = tm .resumeDownloadFile (resumableFileDownload );
128
+ resumedFileDownload .progress ().snapshot ();
129
+ resumedFileDownload .completionFuture ().join ();
130
+ assertThat (path .toFile ()).hasContent (newObject );
131
+ assertThat (resumedFileDownload .progress ().snapshot ().transferSizeInBytes ()).hasValue ((long ) newObject .getBytes (StandardCharsets .UTF_8 ).length );
132
+ }
133
+
134
+ @ Test
135
+ void pauseAndResume_fileChanged_shouldStartFromBeginning () throws IOException {
136
+ Path path = RandomTempFile .randomUncreatedFile ().toPath ();
137
+ DownloadFileRequest request = DownloadFileRequest .builder ()
138
+ .getObjectRequest (b -> b .bucket (BUCKET ).key (KEY ))
139
+ .destination (path )
140
+ .build ();
141
+ FileDownload download = tm .downloadFile (request );
142
+ waitUntilFirstByteBufferDelivered (download );
143
+
144
+ ResumableFileDownload resumableFileDownload = download .pause ();
145
+ Files .write (path , "helloworld" .getBytes (StandardCharsets .UTF_8 ));
146
+
147
+ verifyFileDownload (path , resumableFileDownload , OBJ_SIZE );
148
+ }
149
+
150
+ private static void verifyFileDownload (Path path , ResumableFileDownload resumableFileDownload , long expectedBytesTransferred ) {
151
+ FileDownload resumedFileDownload = tm .resumeDownloadFile (resumableFileDownload );
152
+ resumedFileDownload .progress ().snapshot ();
153
+ resumedFileDownload .completionFuture ().join ();
154
+ assertThat (path .toFile ()).hasSameBinaryContentAs (sourceFile );
155
+ assertThat (resumedFileDownload .progress ().snapshot ().transferSizeInBytes ()).hasValue (expectedBytesTransferred );
156
+ }
157
+
158
+ private static void waitUntilFirstByteBufferDelivered (FileDownload download ) {
159
+ Waiter <TransferProgressSnapshot > waiter = Waiter .builder (TransferProgressSnapshot .class )
160
+ .addAcceptor (WaiterAcceptor .successOnResponseAcceptor (r -> r .bytesTransferred () > 0 ))
161
+ .addAcceptor (WaiterAcceptor .retryOnResponseAcceptor (r -> true ))
162
+ .overrideConfiguration (o -> o .waitTimeout (Duration .ofMinutes (1 ))
163
+ .maxAttempts (Integer .MAX_VALUE )
164
+ .backoffStrategy (FixedDelayBackoffStrategy .create (Duration .ofMillis (100 ))))
165
+ .build ();
166
+ waiter .run (() -> download .progress ().snapshot ());
88
167
}
89
168
90
169
private static final class TestDownloadListener implements TransferListener {
91
- private final CountDownLatch countDownLatch ;
92
170
private GetObjectResponse getObjectResponse ;
93
171
94
- private TestDownloadListener (CountDownLatch countDownLatch ) {
95
- this .countDownLatch = countDownLatch ;
96
- }
97
-
98
172
@ Override
99
173
public void bytesTransferred (Context .BytesTransferred context ) {
100
174
Optional <SdkResponse > sdkResponse = context .progressSnapshot ().sdkResponse ();
101
175
if (sdkResponse .isPresent () && sdkResponse .get () instanceof GetObjectResponse ) {
102
176
getObjectResponse = (GetObjectResponse ) sdkResponse .get ();
103
177
}
104
- countDownLatch .countDown ();
105
178
}
106
179
}
107
180
0 commit comments