31
31
import org .apache .commons .lang3 .RandomUtils ;
32
32
import org .junit .jupiter .api .AfterEach ;
33
33
import org .junit .jupiter .api .BeforeEach ;
34
+ import org .junit .jupiter .api .RepeatedTest ;
34
35
import software .amazon .awssdk .core .SdkBytes ;
35
36
import software .amazon .awssdk .core .async .SdkPublisher ;
36
37
import software .amazon .awssdk .http .nio .netty .NettyNioAsyncHttpClient ;
@@ -115,16 +116,17 @@ public void tearDown() {
115
116
asyncClient .close ();
116
117
}
117
118
119
+ @ RepeatedTest (30 )
118
120
@ RetryableTest (maxRetries = 3 , retryableException = StabilityTestsRetryableException .class )
119
- public void putRecords_subscribeToShard () {
121
+ public void putRecords_subscribeToShard () throws InterruptedException {
120
122
putRecords ();
121
123
subscribeToShard ();
122
124
}
123
125
124
126
/**
125
127
* We only have one run of subscribeToShard tests because it takes 5 minutes.
126
128
*/
127
- private void subscribeToShard () {
129
+ private void subscribeToShard () throws InterruptedException {
128
130
log .info (() -> "starting to test subscribeToShard to stream: " + streamName );
129
131
List <CompletableFuture <?>> completableFutures = generateSubscribeToShardFutures ();
130
132
StabilityTestRunner .newRunner ()
@@ -170,12 +172,15 @@ private void putRecords() {
170
172
* Generate request per consumer/shard combination
171
173
* @return a lit of completablefutures
172
174
*/
173
- private List <CompletableFuture <?>> generateSubscribeToShardFutures () {
175
+ private List <CompletableFuture <?>> generateSubscribeToShardFutures () throws InterruptedException {
174
176
List <CompletableFuture <?>> completableFutures = new ArrayList <>();
177
+ int baseDelay = 150 ;
178
+ int jitterRange = 150 ;
175
179
for (int i = 0 ; i < CONSUMER_COUNT ; i ++) {
176
180
final int consumerIndex = i ;
177
181
for (int j = 0 ; j < SHARD_COUNT ; j ++) {
178
182
final int shardIndex = j ;
183
+ Thread .sleep (baseDelay + (int )(Math .random () * jitterRange ));
179
184
TestSubscribeToShardResponseHandler responseHandler =
180
185
new TestSubscribeToShardResponseHandler (consumerIndex , shardIndex );
181
186
CompletableFuture <Void > completableFuture =
@@ -184,9 +189,6 @@ private List<CompletableFuture<?>> generateSubscribeToShardFutures() {
184
189
.startingPosition (s -> s .type (ShardIteratorType .TRIM_HORIZON )),
185
190
responseHandler )
186
191
.thenAccept (b -> {
187
- // Only verify data if all events have been received and the received data is not empty.
188
- // It is possible the received data is empty because there is no record at the position
189
- // event with TRIM_HORIZON.
190
192
if (responseHandler .allEventsReceived && !responseHandler .receivedData .isEmpty ()) {
191
193
assertThat (producedData ).as (responseHandler .id + " has not received all events"
192
194
+ "." ).containsSequence (responseHandler .receivedData );
0 commit comments