@@ -116,15 +116,15 @@ public void tearDown() {
116
116
}
117
117
118
118
@ RetryableTest (maxRetries = 3 , retryableException = StabilityTestsRetryableException .class )
119
- public void putRecords_subscribeToShard () {
119
+ public void putRecords_subscribeToShard () throws InterruptedException {
120
120
putRecords ();
121
121
subscribeToShard ();
122
122
}
123
123
124
124
/**
125
125
* We only have one run of subscribeToShard tests because it takes 5 minutes.
126
126
*/
127
- private void subscribeToShard () {
127
+ private void subscribeToShard () throws InterruptedException {
128
128
log .info (() -> "starting to test subscribeToShard to stream: " + streamName );
129
129
List <CompletableFuture <?>> completableFutures = generateSubscribeToShardFutures ();
130
130
StabilityTestRunner .newRunner ()
@@ -170,12 +170,15 @@ private void putRecords() {
170
170
* Generate request per consumer/shard combination
171
171
* @return a lit of completablefutures
172
172
*/
173
- private List <CompletableFuture <?>> generateSubscribeToShardFutures () {
173
+ private List <CompletableFuture <?>> generateSubscribeToShardFutures () throws InterruptedException {
174
174
List <CompletableFuture <?>> completableFutures = new ArrayList <>();
175
+ int baseDelay = 150 ;
176
+ int jitterRange = 150 ;
175
177
for (int i = 0 ; i < CONSUMER_COUNT ; i ++) {
176
178
final int consumerIndex = i ;
177
179
for (int j = 0 ; j < SHARD_COUNT ; j ++) {
178
180
final int shardIndex = j ;
181
+ Thread .sleep (baseDelay + (int )(Math .random () * jitterRange ));
179
182
TestSubscribeToShardResponseHandler responseHandler =
180
183
new TestSubscribeToShardResponseHandler (consumerIndex , shardIndex );
181
184
CompletableFuture <Void > completableFuture =
0 commit comments