Skip to content

Commit 417a37f

Browse files
committed
Fix for LoadbalanceTest concurrency issue
Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 9baf974 commit 417a37f

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

rsocket-core/src/test/java/io/rsocket/loadbalance/LoadbalanceTest.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
import java.util.Arrays;
1111
import java.util.Collections;
1212
import java.util.List;
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.TimeUnit;
1315
import java.util.concurrent.atomic.AtomicInteger;
1416
import org.assertj.core.api.Assertions;
17+
import org.junit.jupiter.api.BeforeEach;
1518
import org.junit.jupiter.api.Test;
1619
import org.mockito.Mockito;
1720
import org.reactivestreams.Publisher;
@@ -26,10 +29,13 @@
2629

2730
public class LoadbalanceTest {
2831

29-
@Test
30-
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
32+
@BeforeEach
33+
void setUp() {
3134
Hooks.onErrorDropped((__) -> {});
35+
}
3236

37+
@Test
38+
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
3339
final AtomicInteger counter = new AtomicInteger();
3440
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
3541
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -51,12 +57,14 @@ public Mono<Void> fireAndForget(Payload payload) {
5157
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
5258
final RSocketPool rSocketPool =
5359
new RSocketPool(rSocketConnectorMock, source, new RoundRobinLoadbalanceStrategy());
60+
final CountDownLatch requestLatch = new CountDownLatch(1000);
5461

5562
RaceTestUtils.race(
5663
() -> {
5764
for (int j = 0; j < 1000; j++) {
5865
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
5966
.retry()
67+
.doFinally(s -> requestLatch.countDown())
6068
.subscribe();
6169
}
6270
},
@@ -75,16 +83,15 @@ public Mono<Void> fireAndForget(Payload payload) {
7583
}
7684
});
7785

86+
requestLatch.await(5, TimeUnit.SECONDS);
7887
Assertions.assertThat(counter.get()).isEqualTo(1000);
7988

8089
counter.set(0);
8190
}
8291
}
8392

8493
@Test
85-
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
86-
Hooks.onErrorDropped((__) -> {});
87-
94+
public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
8895
final AtomicInteger counter = new AtomicInteger();
8996
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
9097
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -101,13 +108,15 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
101108
WeightedLoadbalanceStrategy.builder()
102109
.weightedStatsResolver(r -> (WeightedStats) r)
103110
.build());
111+
final CountDownLatch requestLatch = new CountDownLatch(1000);
104112

105113
RaceTestUtils.race(
106114
() -> {
107115
for (int j = 0; j < 1000; j++) {
108116
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
109117
.retry()
110-
.subscribe();
118+
.doFinally(s -> requestLatch.countDown())
119+
.subscribe(aVoid -> {}, Throwable::printStackTrace);
111120
}
112121
},
113122
() -> {
@@ -125,6 +134,7 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
125134
}
126135
});
127136

137+
requestLatch.await(5, TimeUnit.SECONDS);
128138
Assertions.assertThat(counter.get()).isEqualTo(1000);
129139

130140
counter.set(0);
@@ -133,8 +143,6 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
133143

134144
@Test
135145
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
136-
Hooks.onErrorDropped((__) -> {});
137-
138146
final AtomicInteger counter = new AtomicInteger();
139147
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
140148
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -179,8 +187,6 @@ public Mono<Void> fireAndForget(Payload payload) {
179187

180188
@Test
181189
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
182-
Hooks.onErrorDropped((__) -> {});
183-
184190
final AtomicInteger counter = new AtomicInteger();
185191
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
186192
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

rsocket-core/src/test/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategyTest.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import org.assertj.core.api.Assertions;
1313
import org.assertj.core.data.Offset;
14+
import org.junit.jupiter.api.BeforeEach;
1415
import org.junit.jupiter.api.Test;
1516
import org.mockito.Mockito;
1617
import reactor.core.publisher.Hooks;
@@ -19,10 +20,13 @@
1920

2021
public class RoundRobinLoadbalanceStrategyTest {
2122

22-
@Test
23-
public void shouldDeliverValuesProportionally() {
23+
@BeforeEach
24+
void setUp() {
2425
Hooks.onErrorDropped((__) -> {});
26+
}
2527

28+
@Test
29+
public void shouldDeliverValuesProportionally() {
2630
final AtomicInteger counter1 = new AtomicInteger();
2731
final AtomicInteger counter2 = new AtomicInteger();
2832
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
@@ -71,8 +75,6 @@ public Mono<Void> fireAndForget(Payload payload) {
7175

7276
@Test
7377
public void shouldDeliverValuesToNewlyConnectedSockets() {
74-
Hooks.onErrorDropped((__) -> {});
75-
7678
final AtomicInteger counter1 = new AtomicInteger();
7779
final AtomicInteger counter2 = new AtomicInteger();
7880
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
@@ -104,7 +106,7 @@ public Mono<Void> fireAndForget(Payload payload) {
104106
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
105107
}
106108

107-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
109+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
108110

109111
Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));
110112

@@ -114,7 +116,7 @@ public Mono<Void> fireAndForget(Payload payload) {
114116
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
115117
}
116118

117-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
119+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
118120

119121
Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));
120122

@@ -130,7 +132,7 @@ public Mono<Void> fireAndForget(Payload payload) {
130132
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
131133
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));
132134

133-
source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
135+
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));
134136

135137
for (int j = 0; j < 1000; j++) {
136138
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();

0 commit comments

Comments
 (0)