Skip to content

Attempt to fix the flakiness of the codegen-generated-classes-test tests. #2769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,18 +193,15 @@ private CompletableFuture<Response<OutputT>> executeHttpRequest(SdkHttpFullReque
TimeoutTracker timeoutTracker = setupAttemptTimer(responseFuture, context);
context.apiCallAttemptTimeoutTracker(timeoutTracker);

// Forward the cancellation
responseFuture.whenComplete((r, t) -> {
if (t != null) {
httpClientFuture.completeExceptionally(t);
}
});
// Forward potential cancellations to the upstream futures that our result future depends on.
CompletableFutureUtils.forwardExceptionTo(responseFuture, httpClientFuture);
CompletableFutureUtils.forwardExceptionTo(responseFuture, responseHandlerFuture);

// Offload the completion of the future returned from this stage onto
// the future completion executor
responseHandlerFuture.whenCompleteAsync((r, t) -> {
// When the response handler and HTTP client are done processing the request, use the future completion executor
// to complete the future returned by this function.
CompletableFuture.allOf(responseHandlerFuture, httpClientFuture).whenCompleteAsync((r, t) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully there's no edge case where httpClientFuture never gets completed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if (t == null) {
responseFuture.complete(r);
responseFuture.complete(responseHandlerFuture.join());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responseFuture.complete(r)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

r is Void, since it's from CompletableFuture.allOf

} else {
responseFuture.completeExceptionally(t);
}
Expand All @@ -218,7 +215,6 @@ private CompletableFuture<Void> doExecuteHttpRequest(RequestExecutionContext con
long callStart = System.nanoTime();
CompletableFuture<Void> httpClientFuture = sdkAsyncHttpClient.execute(executeRequest);

// Offload the metrics reporting from this stage onto the future completion executor
CompletableFuture<Void> result = httpClientFuture.whenComplete((r, t) -> {
long duration = System.nanoTime() - callStart;
metricCollector.reportMetric(CoreMetric.SERVICE_CALL_DURATION, Duration.ofNanos(duration));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,24 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.core.Response;
import software.amazon.awssdk.core.async.EmptyPublisher;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.http.ExecutionContext;
import software.amazon.awssdk.core.http.NoopTestRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
import software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.metrics.MetricCollector;
import software.amazon.awssdk.metrics.NoOpMetricCollector;
import utils.ValidSdkObjects;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -65,7 +69,7 @@ public class MakeAsyncHttpRequestStageTest {
@Mock
private ScheduledExecutorService timeoutExecutor;

private CompletableFuture<Void> clientExecuteFuture = CompletableFuture.completedFuture(null);
private CompletableFuture<Void> clientExecuteFuture = new CompletableFuture<>();

@Mock
private ScheduledFuture future;
Expand Down Expand Up @@ -107,8 +111,86 @@ public void apiCallAttemptTimeoutNotEnabled_shouldNotInvokeExecutor() throws Exc
verify(timeoutExecutor, never()).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
}

@Test
public void success_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
TransformingAsyncResponseHandler<Response<Object>> handler =
AsyncResponseHandlerTestUtils.noOpResponseHandler();

stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
CompletableFuture<SdkHttpFullRequest> requestFuture =
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());

CompletableFuture<?> result = stage.execute(requestFuture, requestContext());

assertThat(result.isDone()).isFalse();
handler.onStream(new EmptyPublisher<>());
assertThat(result.isDone()).isFalse();
clientExecuteFuture.complete(null);
assertThat(result.isDone()).isTrue();
assertThat(result.isCompletedExceptionally()).isFalse();
}

@Test
public void success_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
TransformingAsyncResponseHandler<Response<Object>> handler =
AsyncResponseHandlerTestUtils.noOpResponseHandler();

stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
CompletableFuture<SdkHttpFullRequest> requestFuture =
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());

CompletableFuture<?> result = stage.execute(requestFuture, requestContext());

assertThat(result.isDone()).isFalse();
clientExecuteFuture.complete(null);
assertThat(result.isDone()).isFalse();
handler.onStream(new EmptyPublisher<>());
assertThat(result.isDone()).isTrue();
assertThat(result.isCompletedExceptionally()).isFalse();
}

@Test
public void failure_stageShouldNotCompleteBeforeHttpClientFutureIsCompleted() throws Exception {
TransformingAsyncResponseHandler<Response<Object>> handler =
AsyncResponseHandlerTestUtils.noOpResponseHandler();

stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
CompletableFuture<SdkHttpFullRequest> requestFuture =
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());

CompletableFuture<?> result = stage.execute(requestFuture, requestContext());

assertThat(result.isDone()).isFalse();
handler.onError(new Throwable());
assertThat(result.isDone()).isFalse();
clientExecuteFuture.complete(null);
assertThat(result.isDone()).isTrue();
assertThat(result.isCompletedExceptionally()).isTrue();
}

@Test
public void failure_stageShouldNotCompleteBeforeResponseHandlerFutureIsCompleted() throws Exception {
TransformingAsyncResponseHandler<Response<Object>> handler =
AsyncResponseHandlerTestUtils.noOpResponseHandler();

stage = new MakeAsyncHttpRequestStage<>(handler, clientDependencies(null));
CompletableFuture<SdkHttpFullRequest> requestFuture =
CompletableFuture.completedFuture(ValidSdkObjects.sdkHttpFullRequest().build());

CompletableFuture<?> result = stage.execute(requestFuture, requestContext());

assertThat(result.isDone()).isFalse();
clientExecuteFuture.completeExceptionally(new Throwable());
assertThat(result.isDone()).isFalse();
handler.onStream(new EmptyPublisher<>());
assertThat(result.isDone()).isTrue();
assertThat(result.isCompletedExceptionally()).isTrue();
}

@Test
public void testExecute_contextContainsMetricCollector_addsChildToExecuteRequest() {
clientExecuteFuture.complete(null);

stage = new MakeAsyncHttpRequestStage<>(
combinedAsyncResponseHandler(AsyncResponseHandlerTestUtils.noOpResponseHandler(),
AsyncResponseHandlerTestUtils.noOpResponseHandler()),
Expand Down Expand Up @@ -168,9 +250,11 @@ private HttpClientDependencies clientDependencies(Duration timeout) {

private RequestExecutionContext requestContext() {
ExecutionContext executionContext = ClientExecutionAndRequestTimerTestUtils.executionContext(ValidSdkObjects.sdkHttpFullRequest().build());
return RequestExecutionContext.builder()
.executionContext(executionContext)
.originalRequest(NoopTestRequest.builder().build())
.build();
RequestExecutionContext requestContext = RequestExecutionContext.builder()
.executionContext(executionContext)
.originalRequest(NoopTestRequest.builder().build())
.build();
requestContext.attemptMetricCollector(NoOpMetricCollector.create());
return requestContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
Expand Down Expand Up @@ -239,10 +238,8 @@ private void onCancel() {
return;
}
try {
SdkCancellationException e = new SdkCancellationException(
"Subscriber cancelled before all events were published");
log.warn("Subscriber cancelled before all events were published");
executeFuture.completeExceptionally(e);
log.warn("Subscriber cancelled before all events were published.");
executeFuture.complete(null);
} finally {
runAndLogError("Could not release channel back to the pool",
() -> closeAndRelease(channelContext));
Expand Down