Skip to content

Fix for multipart integ test failure #5176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
private final AtomicBoolean onStreamCalled = new AtomicBoolean(false);

/**
* Set to true once {@code .concel()} is called in the subscription of the downstream subscriber
* Set to true once {@code .cancel()} is called in the subscription of the downstream subscriber, or if the
* {@code resultFuture} is cancelled.
*/
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

Expand Down Expand Up @@ -108,6 +109,8 @@ public class SplittingTransformer<ResponseT, ResultT> implements SdkPublisher<As
*/
private final AtomicBoolean emitting = new AtomicBoolean(false);

private final Object cancelLock = new Object();

private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstreamResponseTransformer,
Long maximumBufferSizeInBytes,
CompletableFuture<ResultT> resultFuture) {
Expand All @@ -118,6 +121,15 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes");
this.maximumBufferInBytes = Validate.isPositive(
maximumBufferSizeInBytes, "maximumBufferSizeInBytes");

this.resultFuture.whenComplete((r, e) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test case for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a unit test for this. Can we add a end-to-end functional test (wiremock test) to verify the cancellation behavior?

if (e == null) {
return;
}
if (isCancelled.compareAndSet(false, true)) {
handleFutureCancel(e);
}
});
}

/**
Expand Down Expand Up @@ -160,7 +172,7 @@ public void request(long n) {
public void cancel() {
if (isCancelled.compareAndSet(false, true)) {
log.trace(() -> "Cancelling splitting transformer");
handleCancelState();
handleSubscriptionCancel();
}
}
}
Expand Down Expand Up @@ -195,8 +207,14 @@ private boolean doEmit() {
return false;
}

private void handleCancelState() {
synchronized (this) {
/**
* Handle the {@code .cancel()} signal received from the downstream subscription. Data that is being sent to the upstream
* transformer need to finish processing before we complete. One typical use case for this is completing the multipart
* download, the subscriber having reached the final part will signal that it doesn't need more parts by calling {@code
* .cancel()} on the subscription.
*/
private void handleSubscriptionCancel() {
synchronized (cancelLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if !onStreanCalled().get() is no longer true after we make the check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onStreanCalled will only go from false -> true once and never back to false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant more that just because onStreamCalled.get() == false when we evaluate the condition, it doesn't mean it's still false when we execute the body of the if. Is that going to be an issue?

if (!onStreamCalled.get()) { // value is false
     // value could have changed to true after we made the check
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah I see what you mean. Looking at it, yep, it might cause a NPE at line 293. Adding synchronization around the if (onStreamCalled.compareAndSet(false, true)) on cancelLock there at line 293 should prevent it.

if (downstreamSubscriber == null) {
return;
}
Expand All @@ -219,6 +237,23 @@ private void handleCancelState() {
}
}

/**
* Handle when the {@link SplittingTransformer#resultFuture} is cancelled or completed exceptionally from the outside. Data
* need to stop being sent to the upstream transformer immediately. One typical use case for this is transfer manager
* needing to pause download by calling {@code .cancel(true)} on the future.
*
* @param e The exception the future was complete exceptionally with.
*/
private void handleFutureCancel(Throwable e) {
synchronized (cancelLock) {
publisherToUpstream.error(e);
if (downstreamSubscriber != null) {
downstreamSubscriber.onError(e);
downstreamSubscriber = null;
}
}
}

/**
* The AsyncResponseTransformer for each of the individual requests that is sent back to the downstreamSubscriber when
* requested. A future is created per request that is completed when onComplete is called on the subscriber for that request
Expand All @@ -232,14 +267,23 @@ private class IndividualTransformer implements AsyncResponseTransformer<Response
public CompletableFuture<ResponseT> prepare() {
this.individualFuture = new CompletableFuture<>();
if (preparedCalled.compareAndSet(false, true)) {
if (isCancelled.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this logic here?

return individualFuture;
}
CompletableFuture<ResultT> upstreamFuture = upstreamResponseTransformer.prepare();
if (!resultFuture.isDone()) {
CompletableFutureUtils.forwardResultTo(upstreamFuture, resultFuture);
}
}
resultFuture.whenComplete((r, e) -> {
if (e == null) {
return;
}
individualFuture.completeExceptionally(e);
});
individualFuture.whenComplete((r, e) -> {
if (isCancelled.get()) {
handleCancelState();
handleSubscriptionCancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me why we need to check isCancelled here for every individual future?

}
});
return this.individualFuture;
Expand All @@ -259,14 +303,16 @@ public void onStream(SdkPublisher<ByteBuffer> publisher) {
if (downstreamSubscriber == null) {
return;
}
if (onStreamCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onStream on the upstream transformer");
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
DelegatingBufferingSubscriber.builder()
.maximumBufferInBytes(maximumBufferInBytes)
.delegate(upstreamSubscriber)
.build()
));
synchronized (cancelLock) {
if (onStreamCalled.compareAndSet(false, true)) {
log.trace(() -> "calling onStream on the upstream transformer");
upstreamResponseTransformer.onStream(upstreamSubscriber -> publisherToUpstream.subscribe(
DelegatingBufferingSubscriber.builder()
.maximumBufferInBytes(maximumBufferInBytes)
.delegate(upstreamSubscriber)
.build()
));
}
}
publisher.subscribe(new IndividualPartSubscriber<>(this.individualFuture, response));
}
Expand Down Expand Up @@ -312,7 +358,9 @@ public void onNext(ByteBuffer byteBuffer) {
handleError(t);
return;
}
subscription.request(1);
if (!isCancelled.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question as above, is it possible this is no longer true after we check it? If so, is it an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above, onStreanCalled will only go from false -> true once and never back to false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the check here assuming subscription is cancelled properly or will be cancelled at this point?
https://github.com/reactive-streams/reactive-streams-jvm

If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.

Note "eventually" not "right away"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do need to stop right away in the case of the return future being cancelled, for example when pausing with transfer-manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if we don't check it here and continue to send request demand? It should be no-op right?

subscription.request(1);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.junit.jupiter.api.Assertions.fail;

import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -160,6 +161,58 @@ void nullFuture_shouldThrowNullPointerException() {
.hasMessageContaining("resultFuture");
}

@Test
void resultFutureCancelled_shouldSignalErrorToSubscriberAndCancelTransformerFuture() {
CompletableFuture<Object> future = new CompletableFuture<>();
UpstreamTestTransformer transformer = new UpstreamTestTransformer();
SplittingTransformer<TestResultObject, Object> split =
SplittingTransformer.<TestResultObject, Object>builder()
.upstreamResponseTransformer(transformer)
.maximumBufferSizeInBytes(1024L)
.resultFuture(future)
.build();

ErrorCapturingSubscriber subscriber = new ErrorCapturingSubscriber();
split.subscribe(subscriber);

future.cancel(true);

assertThat(subscriber.error).isNotNull();
assertThat(subscriber.error).isInstanceOf(CancellationException.class);

CompletableFuture<Object> transformerFuture = transformer.future;
assertThat(transformerFuture).isCancelled();
}

private static class ErrorCapturingSubscriber
implements Subscriber<AsyncResponseTransformer<TestResultObject, TestResultObject>> {

private Subscription subscription;
private Throwable error;

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(1);
}

@Override
public void onNext(AsyncResponseTransformer<TestResultObject, TestResultObject> transformer) {
transformer.prepare();
transformer.onResponse(new TestResultObject("test"));
transformer.onStream(AsyncRequestBody.fromString("test"));
}

@Override
public void onError(Throwable t) {
this.error = t;
}

@Override
public void onComplete() {
/* do nothing, test only */
}
}

private static class CancelAfterNTestSubscriber
implements Subscriber<AsyncResponseTransformer<TestResultObject, TestResultObject>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ enum TmType{
JAVA, CRT
}

private static Stream<Arguments> transferManagerTypes() {
return Stream.of(Arguments.of(TmType.JAVA), Arguments.of(TmType.CRT));
}

@ParameterizedTest
@MethodSource("transferManagers")
@MethodSource("transferManagerTypes")
void copy_copiedObject_hasSameContent(TmType tmType) throws Exception {
CaptureTransferListener transferListener = new CaptureTransferListener();
byte[] originalContent = randomBytes(OBJ_SIZE);
Expand All @@ -68,7 +72,7 @@ void copy_copiedObject_hasSameContent(TmType tmType) throws Exception {
}

@ParameterizedTest
@MethodSource("transferManagers")
@MethodSource("transferManagerTypes")
void copy_specialCharacters_hasSameContent(TmType tmType) throws Exception {
CaptureTransferListener transferListener = new CaptureTransferListener();
byte[] originalContent = randomBytes(OBJ_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static void teardown() {
* }
* </pre>
*/
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
public void downloadDirectory(S3TransferManager tm) throws Exception {
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u.destination(directory)
Expand All @@ -127,7 +127,7 @@ public void downloadDirectory(S3TransferManager tm) throws Exception {
assertTwoDirectoriesHaveSameStructure(sourceDirectory, directory);
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("prefixTestArguments")
void downloadDirectory_withPrefix(S3TransferManager tm, String prefix) throws Exception {
DirectoryDownload downloadDirectory =
Expand Down Expand Up @@ -155,7 +155,7 @@ void downloadDirectory_withPrefix(S3TransferManager tm, String prefix) throws Ex
* }
* </pre>
*/
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
void downloadDirectory_containsObjectWithPrefixInTheKey_shouldResolveCorrectly(S3TransferManager tm)
throws Exception {
Expand Down Expand Up @@ -187,7 +187,7 @@ void downloadDirectory_containsObjectWithPrefixInTheKey_shouldResolveCorrectly(S
* }
* </pre>
*/
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
public void downloadDirectory_withPrefixAndDelimiter(S3TransferManager tm) throws Exception {
String prefix = "notes-2021";
Expand All @@ -212,7 +212,7 @@ public void downloadDirectory_withPrefixAndDelimiter(S3TransferManager tm) throw
* }
* </pre>
*/
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
public void downloadDirectory_withFilter(S3TransferManager tm) throws Exception {
DirectoryDownload downloadDirectory = tm.downloadDirectory(u -> u
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static void teardown() {
}
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
void uploadDirectory_filesSentCorrectly(S3TransferManager tm) {
String prefix = "yolo";
Expand All @@ -95,7 +95,7 @@ void uploadDirectory_filesSentCorrectly(S3TransferManager tm) {
keys.forEach(k -> verifyContent(k, k.substring(prefix.length() + 1) + randomString));
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
void uploadDirectory_nonExistsBucket_shouldAddFailedRequest(S3TransferManager tm) {
String prefix = "yolo";
Expand All @@ -107,7 +107,7 @@ void uploadDirectory_nonExistsBucket_shouldAddFailedRequest(S3TransferManager tm
assertThat(f.exception()).isInstanceOf(NoSuchBucketException.class));
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
void uploadDirectory_withDelimiter_filesSentCorrectly(S3TransferManager tm) {
String prefix = "hello";
Expand All @@ -130,7 +130,7 @@ void uploadDirectory_withDelimiter_filesSentCorrectly(S3TransferManager tm) {
});
}

@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("transferManagers")
void uploadDirectory_withRequestTransformer_usesRequestTransformer(S3TransferManager tm) throws Exception {
String prefix = "requestTransformerTest";
Expand Down Expand Up @@ -169,7 +169,7 @@ public static Stream<Arguments> prefix() {
* Tests the behavior of traversing local directories with special Unicode characters in their path name. These characters
* have known to be problematic when using Java's old File API or with Windows (which uses UTF-16 for file-name encoding).
*/
@ParameterizedTest
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("prefix")
void uploadDirectory_fileNameWithUnicode_traversedCorrectly(String directoryPrefix, S3TransferManager tm)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ public final <ResultT> Download<ResultT> download(DownloadRequest<ResultT> downl
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(downloadRequest, null);
progressUpdater.transferInitiated();
responseTransformer = isS3ClientMultipartEnabled()
? progressUpdater.wrapResponseTransformerForMultipartDownload(responseTransformer)
? progressUpdater.wrapResponseTransformerForMultipartDownload(
responseTransformer, downloadRequest.getObjectRequest())
: progressUpdater.wrapResponseTransformer(responseTransformer);
progressUpdater.registerCompletion(returnFuture);

Expand Down Expand Up @@ -369,7 +370,8 @@ private TransferProgressUpdater doDownloadFile(
try {
progressUpdater.transferInitiated();
responseTransformer = isS3ClientMultipartEnabled()
? progressUpdater.wrapResponseTransformerForMultipartDownload(responseTransformer)
? progressUpdater.wrapResponseTransformerForMultipartDownload(
responseTransformer, downloadRequest.getObjectRequest())
: progressUpdater.wrapResponseTransformer(responseTransformer);
progressUpdater.registerCompletion(returnFuture);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer;
import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest;
Expand Down Expand Up @@ -165,14 +166,26 @@ public void subscriberOnComplete() {
}

public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformerForMultipartDownload(
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer, GetObjectRequest request) {
return AsyncResponseTransformerListener.wrap(
responseTransformer,
new BaseAsyncResponseTransformerListener() {
@Override
public void transformerOnResponse(GetObjectResponse response) {
ContentRangeParser.totalBytes(response.contentRange())
.ifPresent(totalBytes -> progress.updateAndGet(b -> b.totalBytes(totalBytes).sdkResponse(response)));
// if the GetObjectRequest is a range-get, the Content-Length headers of the response needs to be used
// to update progress since the Content-Range would incorrectly upgrade progress with the whole object
// size.
if (request.range() != null) {
if (response.contentLength() != null) {
progress.updateAndGet(b -> b.totalBytes(response.contentLength()).sdkResponse(response));
}
} else {
// if the GetObjectRequest is not a range-get, it might be a part-get. In that case, we need to parse
// the Content-Range header to get the correct totalByte amount.
ContentRangeParser
.totalBytes(response.contentRange())
.ifPresent(totalBytes -> progress.updateAndGet(b -> b.totalBytes(totalBytes).sdkResponse(response)));
}
}
}
);
Expand Down
Loading