Skip to content

Commit 27347dc

Browse files
committed
Fix for LoadbalanceTest concurrency issue
Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 9baf974 commit 27347dc

File tree

2 files changed

+58
-37
lines changed

2 files changed

+58
-37
lines changed

rsocket-core/src/test/java/io/rsocket/loadbalance/LoadbalanceTest.java

Lines changed: 43 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,9 @@
66
import io.rsocket.transport.ClientTransport;
77
import io.rsocket.util.EmptyPayload;
88
import io.rsocket.util.RSocketProxy;
9-
import java.time.Duration;
10-
import java.util.Arrays;
11-
import java.util.Collections;
12-
import java.util.List;
13-
import java.util.concurrent.atomic.AtomicInteger;
149
import org.assertj.core.api.Assertions;
10+
import org.junit.jupiter.api.AfterAll;
11+
import org.junit.jupiter.api.BeforeEach;
1512
import org.junit.jupiter.api.Test;
1613
import org.mockito.Mockito;
1714
import org.reactivestreams.Publisher;
@@ -24,12 +21,26 @@
2421
import reactor.test.util.RaceTestUtils;
2522
import reactor.util.context.Context;
2623

24+
import java.time.Duration;
25+
import java.util.Arrays;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
2730
public class LoadbalanceTest {
2831

29-
@Test
30-
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
32+
@BeforeEach
33+
void setUp() {
3134
Hooks.onErrorDropped((__) -> {});
35+
}
36+
37+
@AfterAll
38+
static void afterAll() {
39+
Hooks.resetOnErrorDropped();
40+
}
3241

42+
@Test
43+
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
3344
final AtomicInteger counter = new AtomicInteger();
3445
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
3546
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -76,21 +87,28 @@ public Mono<Void> fireAndForget(Payload payload) {
7687
});
7788

7889
Assertions.assertThat(counter.get()).isEqualTo(1000);
79-
8090
counter.set(0);
8191
}
8292
}
8393

8494
@Test
85-
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
86-
Hooks.onErrorDropped((__) -> {});
87-
95+
public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
8896
final AtomicInteger counter = new AtomicInteger();
89-
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
90-
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
9197

92-
Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
93-
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
98+
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
99+
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);
100+
101+
final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
102+
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);
103+
104+
final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
105+
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);
106+
107+
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
108+
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
109+
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
110+
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
111+
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));
94112

95113
for (int i = 0; i < 1000; i++) {
96114
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
@@ -99,42 +117,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
99117
rSocketConnectorMock,
100118
source,
101119
WeightedLoadbalanceStrategy.builder()
102-
.weightedStatsResolver(r -> (WeightedStats) r)
120+
.weightedStatsResolver(
121+
rsocket ->
122+
((PooledRSocket) rsocket).target() == target1
123+
? weightedRSocket1
124+
: weightedRSocket2)
103125
.build());
104126

105127
RaceTestUtils.race(
106128
() -> {
107129
for (int j = 0; j < 1000; j++) {
108130
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
109131
.retry()
110-
.subscribe();
132+
.subscribe(aVoid -> {}, Throwable::printStackTrace);
111133
}
112134
},
113135
() -> {
114136
for (int j = 0; j < 100; j++) {
115137
source.next(Collections.emptyList());
116-
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
117-
source.next(
118-
Arrays.asList(
119-
LoadbalanceTarget.from("1", mockTransport),
120-
LoadbalanceTarget.from("2", mockTransport)));
121-
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
122-
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
138+
source.next(Collections.singletonList(target1));
139+
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
140+
source.next(Collections.singletonList(target2));
123141
source.next(Collections.emptyList());
124-
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
142+
source.next(Collections.singletonList(target2));
125143
}
126144
});
127145

128146
Assertions.assertThat(counter.get()).isEqualTo(1000);
129-
130147
counter.set(0);
131148
}
132149
}
133150

134151
@Test
135152
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
136-
Hooks.onErrorDropped((__) -> {});
137-
138153
final AtomicInteger counter = new AtomicInteger();
139154
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
140155
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -179,8 +194,6 @@ public Mono<Void> fireAndForget(Payload payload) {
179194

180195
@Test
181196
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
182-
Hooks.onErrorDropped((__) -> {});
183-
184197
final AtomicInteger counter = new AtomicInteger();
185198
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
186199
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

rsocket-core/src/test/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategyTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import org.assertj.core.api.Assertions;
1313
import org.assertj.core.data.Offset;
14+
import org.junit.jupiter.api.AfterAll;
15+
import org.junit.jupiter.api.BeforeEach;
1416
import org.junit.jupiter.api.Test;
1517
import org.mockito.Mockito;
1618
import reactor.core.publisher.Hooks;
@@ -19,10 +21,18 @@
1921

2022
public class RoundRobinLoadbalanceStrategyTest {
2123

22-
@Test
23-
public void shouldDeliverValuesProportionally() {
24+
@BeforeEach
25+
void setUp() {
2426
Hooks.onErrorDropped((__) -> {});
27+
}
28+
29+
@AfterAll
30+
static void afterAll() {
31+
Hooks.resetOnErrorDropped();
32+
}
2533

34+
@Test
35+
public void shouldDeliverValuesProportionally() {
2636
final AtomicInteger counter1 = new AtomicInteger();
2737
final AtomicInteger counter2 = new AtomicInteger();
2838
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
@@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {
7181

7282
@Test
7383
public void shouldDeliverValuesToNewlyConnectedSockets() {
74-
Hooks.onErrorDropped((__) -> {});
75-
7684
final AtomicInteger counter1 = new AtomicInteger();
7785
final AtomicInteger counter2 = new AtomicInteger();
7886
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
@@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
104112
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
105113
}
106114

107-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
115+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
108116

109117
Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));
110118

@@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
114122
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
115123
}
116124

117-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
125+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
118126

119127
Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));
120128

@@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
130138
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
131139
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));
132140

133-
source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
141+
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));
134142

135143
for (int j = 0; j < 1000; j++) {
136144
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();

0 commit comments

Comments
 (0)