Skip to content

Commit 1660397

Browse files
L-Applinzoewangg
andauthored
Fix for multipart integ test failure (#5176)
* - Forward cancellation from returnFuture in SplittingTransformer - Use content-length for range-get in TransferProgressUpdater, when multipart is enabled. * handleFutureCancel synchronization * fix checkstyle * fix S3TransferManagerCopyIntegrationTest parameter for TmType * Fix integ tests by setting autoCloseArguments to false for parameterized tests that take tm as argument * fix S3TransferManagerCopyIntegrationTest parameter for TmType --------- Co-authored-by: Zoe Wang <[email protected]>
1 parent 405cc74 commit 1660397

File tree

9 files changed

+201
-33
lines changed

9 files changed

+201
-33
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
7070
private final AtomicBoolean onStreamCalled = new AtomicBoolean(false);
7171

7272
/**
73-
* Set to true once {@code .concel()} is called in the subscription of the downstream subscriber
73+
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
74+
* {@code resultFuture} is cancelled.
7475
*/
7576
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
7677

@@ -108,6 +109,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
108109
*/
109110
private final AtomicBoolean emitting = new AtomicBoolean(false);
110111

112+
private final Object cancelLock = new Object();
113+
111114
private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
112115
Long maximumBufferSizeInBytes,
113116
CompletableFuture<ResultT> resultFuture) {
@@ -118,6 +121,15 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
118121
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
119122
this.maximumBufferInBytes = Validate.isPositive(
120123
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
124+
125+
this.resultFuture.whenComplete((r, e) -> {
126+
if (e == null) {
127+
return;
128+
}
129+
if (isCancelled.compareAndSet(false, true)) {
130+
handleFutureCancel(e);
131+
}
132+
});
121133
}
122134

123135
/**
@@ -160,7 +172,7 @@ public void request(long n) {
160172
public void cancel() {
161173
if (isCancelled.compareAndSet(false, true)) {
162174
log.trace(() -> "Cancelling splitting transformer");
163-
handleCancelState();
175+
handleSubscriptionCancel();
164176
}
165177
}
166178
}
@@ -195,8 +207,14 @@ private boolean doEmit() {
195207
return false;
196208
}
197209

198-
private void handleCancelState() {
199-
synchronized (this) {
210+
/**
211+
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
212+
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
213+
* download, the subscriber having reached the final part will signal that it doesn't need more parts by calling {@code
214+
* .cancel()} on the subscription.
215+
*/
216+
private void handleSubscriptionCancel() {
217+
synchronized (cancelLock) {
200218
if (downstreamSubscriber == null) {
201219
return;
202220
}
@@ -219,6 +237,23 @@ private void handleCancelState() {
219237
}
220238
}
221239

240+
/**
241+
* Handle when the {@link SplittingTransformer#resultFuture} is cancelled or completed exceptionally from the outside. Data
242+
* need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager
243+
* needing to pause download by calling {@code .cancel(true)} on the future.
244+
*
245+
* @param e The exception the future was complete exceptionally with.
246+
*/
247+
private void handleFutureCancel(Throwable e) {
248+
synchronized (cancelLock) {
249+
publisherToUpstream.error(e);
250+
if (downstreamSubscriber != null) {
251+
downstreamSubscriber.onError(e);
252+
downstreamSubscriber = null;
253+
}
254+
}
255+
}
256+
222257
/**
223258
* The AsyncResponseTransformer for each of the individual requests that is sent back to the downstreamSubscriber when
224259
* requested. A future is created per request that is completed when onComplete is called on the subscriber for that request
@@ -232,14 +267,23 @@ private class IndividualTransformer implements AsyncResponseTransformer<Response
232267
public CompletableFuture<ResponseT> prepare() {
233268
this.individualFuture = new CompletableFuture<>();
234269
if (preparedCalled.compareAndSet(false, true)) {
270+
if (isCancelled.get()) {
271+
return individualFuture;
272+
}
235273
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare();
236274
if (!resultFuture.isDone()) {
237275
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
238276
}
239277
}
278+
resultFuture.whenComplete((r, e) -> {
279+
if (e == null) {
280+
return;
281+
}
282+
individualFuture.completeExceptionally(e);
283+
});
240284
individualFuture.whenComplete((r, e) -> {
241285
if (isCancelled.get()) {
242-
handleCancelState();
286+
handleSubscriptionCancel();
243287
}
244288
});
245289
return this.individualFuture;
@@ -259,14 +303,16 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
259303
if (downstreamSubscriber == null) {
260304
return;
261305
}
262-
if (onStreamCalled.compareAndSet(false, true)) {
263-
log.trace(() -> "calling onStream on the upstream transformer");
264-
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
265-
DelegatingBufferingSubscriber.builder()
266-
.maximumBufferInBytes(maximumBufferInBytes)
267-
.delegate(upstreamSubscriber)
268-
.build()
269-
));
306+
synchronized (cancelLock) {
307+
if (onStreamCalled.compareAndSet(false, true)) {
308+
log.trace(() -> "calling onStream on the upstream transformer");
309+
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
310+
DelegatingBufferingSubscriber.builder()
311+
.maximumBufferInBytes(maximumBufferInBytes)
312+
.delegate(upstreamSubscriber)
313+
.build()
314+
));
315+
}
270316
}
271317
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));
272318
}
@@ -312,7 +358,9 @@ public void onNext(ByteBuffer byteBuffer) {
312358
handleError(t);
313359
return;
314360
}
315-
subscription.request(1);
361+
if (!isCancelled.get()) {
362+
subscription.request(1);
363+
}
316364
});
317365
}
318366

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/SplittingTransformerTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.junit.jupiter.api.Assertions.fail;
2121

2222
import java.nio.ByteBuffer;
23+
import java.util.concurrent.CancellationException;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.function.Function;
2526
import java.util.stream.Collectors;
@@ -160,6 +161,58 @@ void nullFuture_shouldThrowNullPointerException() {
160161
.hasMessageContaining("resultFuture");
161162
}
162163

164+
@Test
165+
void resultFutureCancelled_shouldSignalErrorToSubscriberAndCancelTransformerFuture() {
166+
CompletableFuture<Object> future = new CompletableFuture<>();
167+
UpstreamTestTransformer transformer = new UpstreamTestTransformer();
168+
SplittingTransformer<TestResultObject, Object> split =
169+
SplittingTransformer.<TestResultObject, Object>builder()
170+
.upstreamResponseTransformer(transformer)
171+
.maximumBufferSizeInBytes(1024L)
172+
.resultFuture(future)
173+
.build();
174+
175+
ErrorCapturingSubscriber subscriber = new ErrorCapturingSubscriber();
176+
split.subscribe(subscriber);
177+
178+
future.cancel(true);
179+
180+
assertThat(subscriber.error).isNotNull();
181+
assertThat(subscriber.error).isInstanceOf(CancellationException.class);
182+
183+
CompletableFuture<Object> transformerFuture = transformer.future;
184+
assertThat(transformerFuture).isCancelled();
185+
}
186+
187+
private static class ErrorCapturingSubscriber
188+
implements Subscriber<AsyncResponseTransformer<TestResultObject, TestResultObject>> {
189+
190+
private Subscription subscription;
191+
private Throwable error;
192+
193+
@Override
194+
public void onSubscribe(Subscription s) {
195+
this.subscription = s;
196+
s.request(1);
197+
}
198+
199+
@Override
200+
public void onNext(AsyncResponseTransformer<TestResultObject, TestResultObject> transformer) {
201+
transformer.prepare();
202+
transformer.onResponse(new TestResultObject("test"));
203+
transformer.onStream(AsyncRequestBody.fromString("test"));
204+
}
205+
206+
@Override
207+
public void onError(Throwable t) {
208+
this.error = t;
209+
}
210+
211+
@Override
212+
public void onComplete() {
213+
/* do nothing, test only */
214+
}
215+
}
163216

164217
private static class CancelAfterNTestSubscriber
165218
implements Subscriber<AsyncResponseTransformer<TestResultObject, TestResultObject>> {

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerCopyIntegrationTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,12 @@ enum TmType{
5757
JAVA, CRT
5858
}
5959

60+
private static Stream<Arguments> transferManagerTypes() {
61+
return Stream.of(Arguments.of(TmType.JAVA), Arguments.of(TmType.CRT));
62+
}
63+
6064
@ParameterizedTest
61-
@MethodSource("transferManagers")
65+
@MethodSource("transferManagerTypes")
6266
void copy_copiedObject_hasSameContent(TmType tmType) throws Exception {
6367
CaptureTransferListener transferListener = new CaptureTransferListener();
6468
byte[] originalContent = randomBytes(OBJ_SIZE);
@@ -68,7 +72,7 @@ void copy_copiedObject_hasSameContent(TmType tmType) throws Exception {
6872
}
6973

7074
@ParameterizedTest
71-
@MethodSource("transferManagers")
75+
@MethodSource("transferManagerTypes")
7276
void copy_specialCharacters_hasSameContent(TmType tmType) throws Exception {
7377
CaptureTransferListener transferListener = new CaptureTransferListener();
7478
byte[] originalContent = randomBytes(OBJ_SIZE);

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerDownloadDirectoryIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public static void teardown() {
117117
* }
118118
* </pre>
119119
*/
120-
@ParameterizedTest
120+
@ParameterizedTest(autoCloseArguments = false)
121121
@MethodSource("transferManagers")
122122
public void downloadDirectory(S3TransferManager tm) throws Exception {
123123
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u.destination(directory)
@@ -127,7 +127,7 @@ public void downloadDirectory(S3TransferManager tm) throws Exception {
127127
assertTwoDirectoriesHaveSameStructure(sourceDirectory, directory);
128128
}
129129

130-
@ParameterizedTest
130+
@ParameterizedTest(autoCloseArguments = false)
131131
@MethodSource("prefixTestArguments")
132132
void downloadDirectory_withPrefix(S3TransferManager tm, String prefix) throws Exception {
133133
DirectoryDownload downloadDirectory =
@@ -155,7 +155,7 @@ void downloadDirectory_withPrefix(S3TransferManager tm, String prefix) throws Ex
155155
* }
156156
* </pre>
157157
*/
158-
@ParameterizedTest
158+
@ParameterizedTest(autoCloseArguments = false)
159159
@MethodSource("transferManagers")
160160
void downloadDirectory_containsObjectWithPrefixInTheKey_shouldResolveCorrectly(S3TransferManager tm)
161161
throws Exception {
@@ -187,7 +187,7 @@ void downloadDirectory_containsObjectWithPrefixInTheKey_shouldResolveCorrectly(S
187187
* }
188188
* </pre>
189189
*/
190-
@ParameterizedTest
190+
@ParameterizedTest(autoCloseArguments = false)
191191
@MethodSource("transferManagers")
192192
public void downloadDirectory_withPrefixAndDelimiter(S3TransferManager tm) throws Exception {
193193
String prefix = "notes-2021";
@@ -212,7 +212,7 @@ public void downloadDirectory_withPrefixAndDelimiter(S3TransferManager tm) throw
212212
* }
213213
* </pre>
214214
*/
215-
@ParameterizedTest
215+
@ParameterizedTest(autoCloseArguments = false)
216216
@MethodSource("transferManagers")
217217
public void downloadDirectory_withFilter(S3TransferManager tm) throws Exception {
218218
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadDirectoryIntegrationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static void teardown() {
7676
}
7777
}
7878

79-
@ParameterizedTest
79+
@ParameterizedTest(autoCloseArguments = false)
8080
@MethodSource("transferManagers")
8181
void uploadDirectory_filesSentCorrectly(S3TransferManager tm) {
8282
String prefix = "yolo";
@@ -95,7 +95,7 @@ void uploadDirectory_filesSentCorrectly(S3TransferManager tm) {
9595
keys.forEach(k -> verifyContent(k, k.substring(prefix.length() + 1) + randomString));
9696
}
9797

98-
@ParameterizedTest
98+
@ParameterizedTest(autoCloseArguments = false)
9999
@MethodSource("transferManagers")
100100
void uploadDirectory_nonExistsBucket_shouldAddFailedRequest(S3TransferManager tm) {
101101
String prefix = "yolo";
@@ -107,7 +107,7 @@ void uploadDirectory_nonExistsBucket_shouldAddFailedRequest(S3TransferManager tm
107107
assertThat(f.exception()).isInstanceOf(NoSuchBucketException.class));
108108
}
109109

110-
@ParameterizedTest
110+
@ParameterizedTest(autoCloseArguments = false)
111111
@MethodSource("transferManagers")
112112
void uploadDirectory_withDelimiter_filesSentCorrectly(S3TransferManager tm) {
113113
String prefix = "hello";
@@ -130,7 +130,7 @@ void uploadDirectory_withDelimiter_filesSentCorrectly(S3TransferManager tm) {
130130
});
131131
}
132132

133-
@ParameterizedTest
133+
@ParameterizedTest(autoCloseArguments = false)
134134
@MethodSource("transferManagers")
135135
void uploadDirectory_withRequestTransformer_usesRequestTransformer(S3TransferManager tm) throws Exception {
136136
String prefix = "requestTransformerTest";
@@ -169,7 +169,7 @@ public static Stream<Arguments> prefix() {
169169
* Tests the behavior of traversing local directories with special Unicode characters in their path name. These characters
170170
* have known to be problematic when using Java's old File API or with Windows (which uses UTF-16 for file-name encoding).
171171
*/
172-
@ParameterizedTest
172+
@ParameterizedTest(autoCloseArguments = false)
173173
@MethodSource("prefix")
174174
void uploadDirectory_fileNameWithUnicode_traversedCorrectly(String directoryPrefix, S3TransferManager tm)
175175
throws IOException {

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
324324
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null);
325325
progressUpdater.transferInitiated();
326326
responseTransformer = isS3ClientMultipartEnabled()
327-
? progressUpdater.wrapResponseTransformerForMultipartDownload(responseTransformer)
327+
? progressUpdater.wrapResponseTransformerForMultipartDownload(
328+
responseTransformer, downloadRequest.getObjectRequest())
328329
: progressUpdater.wrapResponseTransformer(responseTransformer);
329330
progressUpdater.registerCompletion(returnFuture);
330331

@@ -369,7 +370,8 @@ private TransferProgressUpdater doDownloadFile(
369370
try {
370371
progressUpdater.transferInitiated();
371372
responseTransformer = isS3ClientMultipartEnabled()
372-
? progressUpdater.wrapResponseTransformerForMultipartDownload(responseTransformer)
373+
? progressUpdater.wrapResponseTransformerForMultipartDownload(
374+
responseTransformer, downloadRequest.getObjectRequest())
373375
: progressUpdater.wrapResponseTransformer(responseTransformer);
374376
progressUpdater.registerCompletion(returnFuture);
375377

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener;
2929
import software.amazon.awssdk.core.async.listener.PublisherListener;
3030
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
31+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
3132
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
3233
import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer;
3334
import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest;
@@ -165,14 +166,26 @@ public void subscriberOnComplete() {
165166
}
166167

167168
public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformerForMultipartDownload(
168-
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
169+
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer, GetObjectRequest request) {
169170
return AsyncResponseTransformerListener.wrap(
170171
responseTransformer,
171172
new BaseAsyncResponseTransformerListener() {
172173
@Override
173174
public void transformerOnResponse(GetObjectResponse response) {
174-
ContentRangeParser.totalBytes(response.contentRange())
175-
.ifPresent(totalBytes -> progress.updateAndGet(b -> b.totalBytes(totalBytes).sdkResponse(response)));
175+
// if the GetObjectRequest is a range-get, the Content-Length headers of the response needs to be used
176+
// to update progress since the Content-Range would incorrectly upgrade progress with the whole object
177+
// size.
178+
if (request.range() != null) {
179+
if (response.contentLength() != null) {
180+
progress.updateAndGet(b -> b.totalBytes(response.contentLength()).sdkResponse(response));
181+
}
182+
} else {
183+
// if the GetObjectRequest is not a range-get, it might be a part-get. In that case, we need to parse
184+
// the Content-Range header to get the correct totalByte amount.
185+
ContentRangeParser
186+
.totalBytes(response.contentRange())
187+
.ifPresent(totalBytes -> progress.updateAndGet(b -> b.totalBytes(totalBytes).sdkResponse(response)));
188+
}
176189
}
177190
}
178191
);

0 commit comments

Comments
 (0)