Skip to content

Commit 1e58c25

Browse files
committed
Add backoff with jitter to Kinesis stabiltiy test
1 parent 157f16c commit 1e58c25

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

test/stability-tests/src/it/java/software/amazon/awssdk/stability/tests/kinesis/KinesisStabilityTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.commons.lang3.RandomUtils;
3232
import org.junit.jupiter.api.AfterEach;
3333
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.RepeatedTest;
3435
import software.amazon.awssdk.core.SdkBytes;
3536
import software.amazon.awssdk.core.async.SdkPublisher;
3637
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
@@ -115,16 +116,17 @@ public void tearDown() {
115116
asyncClient.close();
116117
}
117118

119+
@RepeatedTest(30)
118120
@RetryableTest(maxRetries = 3, retryableException = StabilityTestsRetryableException.class)
119-
public void putRecords_subscribeToShard() {
121+
public void putRecords_subscribeToShard() throws InterruptedException {
120122
putRecords();
121123
subscribeToShard();
122124
}
123125

124126
/**
125127
* We only have one run of subscribeToShard tests because it takes 5 minutes.
126128
*/
127-
private void subscribeToShard() {
129+
private void subscribeToShard() throws InterruptedException {
128130
log.info(() -> "starting to test subscribeToShard to stream: " + streamName);
129131
List<CompletableFuture<?>> completableFutures = generateSubscribeToShardFutures();
130132
StabilityTestRunner.newRunner()
@@ -170,12 +172,15 @@ private void putRecords() {
170172
* Generate request per consumer/shard combination
171173
* @return a lit of completablefutures
172174
*/
173-
private List<CompletableFuture<?>> generateSubscribeToShardFutures() {
175+
private List<CompletableFuture<?>> generateSubscribeToShardFutures() throws InterruptedException {
174176
List<CompletableFuture<?>> completableFutures = new ArrayList<>();
177+
int baseDelay = 150;
178+
int jitterRange = 150;
175179
for (int i = 0; i < CONSUMER_COUNT; i++) {
176180
final int consumerIndex = i;
177181
for (int j = 0; j < SHARD_COUNT; j++) {
178182
final int shardIndex = j;
183+
Thread.sleep(baseDelay + (int)(Math.random() * jitterRange));
179184
TestSubscribeToShardResponseHandler responseHandler =
180185
new TestSubscribeToShardResponseHandler(consumerIndex, shardIndex);
181186
CompletableFuture<Void> completableFuture =
@@ -184,9 +189,6 @@ private List<CompletableFuture<?>> generateSubscribeToShardFutures() {
184189
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
185190
responseHandler)
186191
.thenAccept(b -> {
187-
// Only verify data if all events have been received and the received data is not empty.
188-
// It is possible the received data is empty because there is no record at the position
189-
// event with TRIM_HORIZON.
190192
if (responseHandler.allEventsReceived && !responseHandler.receivedData.isEmpty()) {
191193
assertThat(producedData).as(responseHandler.id + " has not received all events"
192194
+ ".").containsSequence(responseHandler.receivedData);

0 commit comments

Comments
 (0)