Skip to content

Fix stability tests #5178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.stability.tests.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static software.amazon.awssdk.stability.tests.utils.StabilityTestRunner.ALLOWED_MAX_PEAK_THREAD_COUNT;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -61,10 +62,18 @@ public abstract class S3AsyncBaseStabilityTest extends AwsTestBase {
.build();
}

private final int allowedPeakThreads;

public S3AsyncBaseStabilityTest(S3AsyncClient testClient) {
this(testClient, ALLOWED_MAX_PEAK_THREAD_COUNT);
}

public S3AsyncBaseStabilityTest(S3AsyncClient testClient, int maxThreadCount) {
this.allowedPeakThreads = maxThreadCount;
this.testClient = testClient;
}


@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
public void largeObject_put_get_usingFile() {
String md5Upload = uploadLargeObjectFromFile();
Expand All @@ -87,7 +96,7 @@ protected String computeKeyName(int i) {
protected void doGetBucketAcl_lowTpsLongInterval() {
IntFunction<CompletableFuture<?>> future = i -> testClient.getBucketAcl(b -> b.bucket(getTestBucketName()));
String className = this.getClass().getSimpleName();
StabilityTestRunner.newRunner()
StabilityTestRunner.newRunner(allowedPeakThreads)
.testName(className + ".getBucketAcl_lowTpsLongInterval")
.futureFactory(future)
.requestCountPerRun(10)
Expand All @@ -99,7 +108,7 @@ protected void doGetBucketAcl_lowTpsLongInterval() {

protected String downloadLargeObjectToFile() {
File randomTempFile = RandomTempFile.randomUncreatedFile();
StabilityTestRunner.newRunner()
StabilityTestRunner.newRunner(allowedPeakThreads)
.testName("S3AsyncStabilityTest.downloadLargeObjectToFile")
.futures(testClient.getObject(b -> b.bucket(getTestBucketName()).key(LARGE_KEY_NAME),
AsyncResponseTransformer.toFile(randomTempFile)))
Expand All @@ -120,7 +129,7 @@ protected String uploadLargeObjectFromFile() {
try {
file = new RandomTempFile((long) 2e+9);
String md5 = Md5Utils.md5AsBase64(file);
StabilityTestRunner.newRunner()
StabilityTestRunner.newRunner(allowedPeakThreads)
.testName("S3AsyncStabilityTest.uploadLargeObjectFromFile")
.futures(testClient.putObject(b -> b.bucket(getTestBucketName()).key(LARGE_KEY_NAME),
AsyncRequestBody.fromFile(file)))
Expand All @@ -144,7 +153,7 @@ protected void putObject() {
AsyncRequestBody.fromBytes(bytes));
};

StabilityTestRunner.newRunner()
StabilityTestRunner.newRunner(allowedPeakThreads)
.testName("S3AsyncStabilityTest.putObject")
.futureFactory(future)
.requestCountPerRun(CONCURRENCY)
Expand All @@ -160,7 +169,7 @@ protected void getObject() {
return testClient.getObject(b -> b.bucket(getTestBucketName()).key(keyName), AsyncResponseTransformer.toFile(path));
};

StabilityTestRunner.newRunner()
StabilityTestRunner.newRunner(allowedPeakThreads)
.testName("S3AsyncStabilityTest.getObject")
.futureFactory(future)
.requestCountPerRun(CONCURRENCY)
Expand All @@ -183,7 +192,7 @@ protected static void deleteBucketAndAllContents(S3AsyncClient client, String bu

client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()).join();
} catch (Exception e) {
log.error(() -> "Failed to delete bucket: " +bucketName);
log.error(() -> "Failed to delete bucket: " + bucketName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@

package software.amazon.awssdk.stability.tests.s3;

import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;

Expand All @@ -27,24 +29,31 @@ public class S3MultipartJavaBasedStabilityTest extends S3AsyncBaseStabilityTest
static {
multipartJavaBasedClient = S3AsyncClient.builder()
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
.maxConcurrency(CONCURRENCY))
.multipartEnabled(true)
.overrideConfiguration(b -> b.apiCallTimeout(Duration.ofMinutes(5))
// Retry at test level
.retryPolicy(RetryPolicy.none()))
.build();
}

public S3MultipartJavaBasedStabilityTest() {
super(multipartJavaBasedClient);
// S3 multipart client uses more threads because for large file uploads, it reads from different positions of the files
// at the same time, which will trigger more Java I/O threads to spin up
super(multipartJavaBasedClient, 250);
}

@BeforeAll
public static void setup() {
s3ApacheClient.createBucket(b -> b.bucket(BUCKET_NAME));
multipartJavaBasedClient.createBucket(b -> b.bucket(BUCKET_NAME)).join();
multipartJavaBasedClient.waiter().waitUntilBucketExists(b -> b.bucket(BUCKET_NAME)).join();
}

@AfterAll
public static void cleanup() {
deleteBucketAndAllContents(multipartJavaBasedClient, BUCKET_NAME);
multipartJavaBasedClient.close();
s3ApacheClient.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@
*/
public class StabilityTestRunner {

public static final int ALLOWED_MAX_PEAK_THREAD_COUNT = 90;
private static final Logger log = Logger.loggerFor(StabilityTestRunner.class);
private static final double ALLOWED_FAILURE_RATIO = 0.05;
private static final int TESTS_TIMEOUT_IN_MINUTES = 60;
// The peak thread count might be different depending on the machine the tests are currently running on.
// because of the internal thread pool used in AsynchronousFileChannel
// Also, synchronous clients have their own thread pools so this measurement needs to be mutable
// so that the async and synchronous paths can both use this runner.
private int allowedPeakThreadCount = 150;
private int allowedPeakThreadCount = ALLOWED_MAX_PEAK_THREAD_COUNT;;

private ThreadMXBean threadMXBean;
private IntFunction<CompletableFuture<?>> futureFactory;
Expand Down Expand Up @@ -343,12 +344,12 @@ private void processResult(TestResult testResult) {
}

if (testResult.peakThreadCount() > allowedPeakThreadCount) {
String errorMessage = String.format("The number of peak thread exceeds the allowed peakThread threshold %s",
allowedPeakThreadCount);
String errorMessage = String.format("The number of peak thread %s exceeds the allowed peakThread threshold %s",
testResult.peakThreadCount(), allowedPeakThreadCount);


threadDump(testResult.testName());
throw new AssertionError(errorMessage);
log.warn(() -> errorMessage);
}
}

Expand Down