Skip to content

Commit d77df7b

Browse files
authored
fixes LoadbalanceTest issues (#983)
Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 9baf974 commit d77df7b

File tree

2 files changed

+52
-32
lines changed

2 files changed

+52
-32
lines changed

rsocket-core/src/test/java/io/rsocket/loadbalance/LoadbalanceTest.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.List;
1313
import java.util.concurrent.atomic.AtomicInteger;
1414
import org.assertj.core.api.Assertions;
15+
import org.junit.jupiter.api.AfterAll;
16+
import org.junit.jupiter.api.BeforeEach;
1517
import org.junit.jupiter.api.Test;
1618
import org.mockito.Mockito;
1719
import org.reactivestreams.Publisher;
@@ -26,10 +28,18 @@
2628

2729
public class LoadbalanceTest {
2830

29-
@Test
30-
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
31+
@BeforeEach
32+
void setUp() {
3133
Hooks.onErrorDropped((__) -> {});
34+
}
35+
36+
@AfterAll
37+
static void afterAll() {
38+
Hooks.resetOnErrorDropped();
39+
}
3240

41+
@Test
42+
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
3343
final AtomicInteger counter = new AtomicInteger();
3444
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
3545
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -76,21 +86,28 @@ public Mono<Void> fireAndForget(Payload payload) {
7686
});
7787

7888
Assertions.assertThat(counter.get()).isEqualTo(1000);
79-
8089
counter.set(0);
8190
}
8291
}
8392

8493
@Test
85-
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
86-
Hooks.onErrorDropped((__) -> {});
87-
94+
public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
8895
final AtomicInteger counter = new AtomicInteger();
89-
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
90-
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
9196

92-
Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
93-
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
97+
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
98+
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);
99+
100+
final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
101+
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);
102+
103+
final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
104+
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);
105+
106+
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
107+
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
108+
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
109+
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
110+
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));
94111

95112
for (int i = 0; i < 1000; i++) {
96113
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
@@ -99,42 +116,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
99116
rSocketConnectorMock,
100117
source,
101118
WeightedLoadbalanceStrategy.builder()
102-
.weightedStatsResolver(r -> (WeightedStats) r)
119+
.weightedStatsResolver(
120+
rsocket ->
121+
((PooledRSocket) rsocket).target() == target1
122+
? weightedRSocket1
123+
: weightedRSocket2)
103124
.build());
104125

105126
RaceTestUtils.race(
106127
() -> {
107128
for (int j = 0; j < 1000; j++) {
108129
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
109130
.retry()
110-
.subscribe();
131+
.subscribe(aVoid -> {}, Throwable::printStackTrace);
111132
}
112133
},
113134
() -> {
114135
for (int j = 0; j < 100; j++) {
115136
source.next(Collections.emptyList());
116-
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
117-
source.next(
118-
Arrays.asList(
119-
LoadbalanceTarget.from("1", mockTransport),
120-
LoadbalanceTarget.from("2", mockTransport)));
121-
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
122-
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
137+
source.next(Collections.singletonList(target1));
138+
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
139+
source.next(Collections.singletonList(target2));
123140
source.next(Collections.emptyList());
124-
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
141+
source.next(Collections.singletonList(target2));
125142
}
126143
});
127144

128145
Assertions.assertThat(counter.get()).isEqualTo(1000);
129-
130146
counter.set(0);
131147
}
132148
}
133149

134150
@Test
135151
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
136-
Hooks.onErrorDropped((__) -> {});
137-
138152
final AtomicInteger counter = new AtomicInteger();
139153
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
140154
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
@@ -179,8 +193,6 @@ public Mono<Void> fireAndForget(Payload payload) {
179193

180194
@Test
181195
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
182-
Hooks.onErrorDropped((__) -> {});
183-
184196
final AtomicInteger counter = new AtomicInteger();
185197
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
186198
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

rsocket-core/src/test/java/io/rsocket/loadbalance/RoundRobinLoadbalanceStrategyTest.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import org.assertj.core.api.Assertions;
1313
import org.assertj.core.data.Offset;
14+
import org.junit.jupiter.api.AfterAll;
15+
import org.junit.jupiter.api.BeforeEach;
1416
import org.junit.jupiter.api.Test;
1517
import org.mockito.Mockito;
1618
import reactor.core.publisher.Hooks;
@@ -19,10 +21,18 @@
1921

2022
public class RoundRobinLoadbalanceStrategyTest {
2123

22-
@Test
23-
public void shouldDeliverValuesProportionally() {
24+
@BeforeEach
25+
void setUp() {
2426
Hooks.onErrorDropped((__) -> {});
27+
}
28+
29+
@AfterAll
30+
static void afterAll() {
31+
Hooks.resetOnErrorDropped();
32+
}
2533

34+
@Test
35+
public void shouldDeliverValuesProportionally() {
2636
final AtomicInteger counter1 = new AtomicInteger();
2737
final AtomicInteger counter2 = new AtomicInteger();
2838
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
@@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {
7181

7282
@Test
7383
public void shouldDeliverValuesToNewlyConnectedSockets() {
74-
Hooks.onErrorDropped((__) -> {});
75-
7684
final AtomicInteger counter1 = new AtomicInteger();
7785
final AtomicInteger counter2 = new AtomicInteger();
7886
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
@@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
104112
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
105113
}
106114

107-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
115+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
108116

109117
Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));
110118

@@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
114122
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
115123
}
116124

117-
source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
125+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
118126

119127
Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));
120128

@@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
130138
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
131139
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));
132140

133-
source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
141+
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));
134142

135143
for (int j = 0; j < 1000; j++) {
136144
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();

0 commit comments

Comments
 (0)