Skip to content

fixes LoadbalanceTest issues #983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
Expand All @@ -26,10 +28,18 @@

public class LoadbalanceTest {

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverAllTheRequestsWithRoundRobinStrategy() throws Exception {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -76,21 +86,28 @@ public Mono<Void> fireAndForget(Payload payload) {
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void shouldDeliverAllTheRequestsWithWightedStrategy() {
Hooks.onErrorDropped((__) -> {});

public void shouldDeliverAllTheRequestsWithWeightedStrategy() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);

Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
.then(im -> Mono.just(new TestRSocket(new WeightedRSocket(counter))));
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);

final LoadbalanceTarget target1 = LoadbalanceTarget.from("1", mockTransport1);
final LoadbalanceTarget target2 = LoadbalanceTarget.from("2", mockTransport2);

final WeightedRSocket weightedRSocket1 = new WeightedRSocket(counter);
final WeightedRSocket weightedRSocket2 = new WeightedRSocket(counter);

final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Mockito.when(rSocketConnectorMock.connect(mockTransport1))
.then(im -> Mono.just(new TestRSocket(weightedRSocket1)));
Mockito.when(rSocketConnectorMock.connect(mockTransport2))
.then(im -> Mono.just(new TestRSocket(weightedRSocket2)));

for (int i = 0; i < 1000; i++) {
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
Expand All @@ -99,42 +116,39 @@ public void shouldDeliverAllTheRequestsWithWightedStrategy() {
rSocketConnectorMock,
source,
WeightedLoadbalanceStrategy.builder()
.weightedStatsResolver(r -> (WeightedStats) r)
.weightedStatsResolver(
rsocket ->
((PooledRSocket) rsocket).target() == target1
? weightedRSocket1
: weightedRSocket2)
.build());

RaceTestUtils.race(
() -> {
for (int j = 0; j < 1000; j++) {
Mono.defer(() -> rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE))
.retry()
.subscribe();
.subscribe(aVoid -> {}, Throwable::printStackTrace);
}
},
() -> {
for (int j = 0; j < 100; j++) {
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(
Arrays.asList(
LoadbalanceTarget.from("1", mockTransport),
LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target1));
source.next(Arrays.asList(target1, target2)).next(Collections.singletonList(target1));
source.next(Collections.singletonList(target2));
source.next(Collections.emptyList());
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport)));
source.next(Collections.singletonList(target2));
}
});

Assertions.assertThat(counter.get()).isEqualTo(1000);

counter.set(0);
}
}

@Test
public void ensureRSocketIsCleanedFromThePoolIfSourceRSocketIsDisposed() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down Expand Up @@ -179,8 +193,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void ensureContextIsPropagatedCorrectlyForRequestChannel() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Hooks;
Expand All @@ -19,10 +21,18 @@

public class RoundRobinLoadbalanceStrategyTest {

@Test
public void shouldDeliverValuesProportionally() {
@BeforeEach
void setUp() {
Hooks.onErrorDropped((__) -> {});
}

@AfterAll
static void afterAll() {
Hooks.resetOnErrorDropped();
}

@Test
public void shouldDeliverValuesProportionally() {
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -71,8 +81,6 @@ public Mono<Void> fireAndForget(Payload payload) {

@Test
public void shouldDeliverValuesToNewlyConnectedSockets() {
Hooks.onErrorDropped((__) -> {});

final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
Expand Down Expand Up @@ -104,7 +112,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));

Expand All @@ -114,7 +122,7 @@ public Mono<Void> fireAndForget(Payload payload) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
}

source.next(Arrays.asList(LoadbalanceTarget.from("1", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));

Assertions.assertThat(counter1.get()).isCloseTo(2000, Offset.offset(1));

Expand All @@ -130,7 +138,7 @@ public Mono<Void> fireAndForget(Payload payload) {
Assertions.assertThat(counter1.get()).isCloseTo(2500, Offset.offset(1));
Assertions.assertThat(counter2.get()).isCloseTo(500, Offset.offset(1));

source.next(Arrays.asList(LoadbalanceTarget.from("2", mockTransport1)));
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));

for (int j = 0; j < 1000; j++) {
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
Expand Down