Skip to content

Fix for threads hanging with forwardTransformedResultTo #5117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@

import java.nio.file.Paths;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
Expand Down Expand Up @@ -217,6 +219,25 @@ void download_cancel_shouldForwardCancellation() {
assertThat(s3CrtFuture).isCancelled();
}

@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
void download_futureReturnsNull_doesNotHang() {
AsyncResponseTransformer<GetObjectResponse, Void> mockTr = mock(AsyncResponseTransformer.class);
CompletableFuture<Void> returnMockFuture = new CompletableFuture<>();
when(mockS3Crt.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)))
.thenReturn(returnMockFuture);
DownloadRequest<Void> downloadRequest =
DownloadRequest.builder()
.getObjectRequest(g -> g.bucket("bucket").key("key"))
.responseTransformer(mockTr).build();

CompletableFuture<CompletedDownload<Void>> future = tm.download(downloadRequest).completionFuture();
returnMockFuture.complete(null);
assertThatThrownBy(future::join)
.hasCauseInstanceOf(NullPointerException.class)
.hasMessageContaining("result must not be null");
}

@Test
void objectLambdaArnBucketProvided_shouldThrowException() {
String objectLambdaArn = "arn:xxx:s3-object-lambda";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static CompletionException errorAsCompletionException(Throwable t) {

/**
* Forward the {@code Throwable} from {@code src} to {@code dst}.

*
* @param src The source of the {@code Throwable}.
* @param dst The destination where the {@code Throwable} will be forwarded to.
*
Expand Down Expand Up @@ -148,8 +148,9 @@ public static <T> CompletableFuture<T> forwardResultTo(CompletableFuture<T> src,
}

/**
* Completes the {@code dst} future based on the result of the {@code src} future, synchronously,
* after applying the provided transformation {@link Function} if successful.
* Completes the {@code dst} future based on the result of the {@code src} future, synchronously, after applying the provided
* transformation {@link Function} if successful. If the function threw an exception, the destination
* future will be completed exceptionally with that exception.
*
* @param src The source {@link CompletableFuture}
* @param dst The destination where the {@code Throwable} or transformed result will be forwarded to.
Expand All @@ -161,9 +162,16 @@ public static <SourceT, DestT> CompletableFuture<SourceT> forwardTransformedResu
src.whenComplete((r, e) -> {
if (e != null) {
dst.completeExceptionally(e);
} else {
dst.complete(function.apply(r));
return;
}
DestT result = null;
try {
result = function.apply(r);
} catch (Throwable functionException) {
dst.completeExceptionally(functionException);
return;
}
dst.complete(result);
});

return src;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ public void forwardTransformedResultTo_srcCompletesExceptionally_shouldCompleteD
assertThatThrownBy(dst::join).hasCause(exception);
}

@Test(timeout = 1000)
public void forwardTransformedResultTo_functionThrowsException_shouldCompleteExceptionally() {
CompletableFuture<Integer> src = new CompletableFuture<>();
CompletableFuture<String> dst = new CompletableFuture<>();

CompletableFutureUtils.forwardTransformedResultTo(src, dst, x -> { throw new RuntimeException("foobar"); });
src.complete(0);
assertThatThrownBy(dst::join)
.hasMessageContaining("foobar")
.hasCauseInstanceOf(RuntimeException.class);
}

@Test(timeout = 1000)
public void anyFail_shouldCompleteWhenAnyFutureFails() {
RuntimeException exception = new RuntimeException("blah");
Expand Down Expand Up @@ -206,4 +218,6 @@ public void joinLikeSync_canceled_throwsCancellationException() {
.hasNoSuppressedExceptions()
.hasNoCause()
.isInstanceOf(CancellationException.class);
}}
}

}