Skip to content

Commit 821e42d

Browse files
authored
Merge pull request #409 from rabbitmq/fix-coordinators-with-load-balancer
Fix coordinator manager creation in load balancer mode
2 parents b41dd1b + 455c229 commit 821e42d

File tree

4 files changed

+119
-3
lines changed

4 files changed

+119
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,10 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
635635
};
636636
ShutdownListener shutdownListener =
637637
shutdownContext -> {
638-
this.closed.set(true);
639-
managers.remove(this);
638+
if (clientInitializedInManager.get()) {
639+
this.closed.set(true);
640+
managers.remove(this);
641+
}
640642
if (shutdownContext.isShutdownUnexpected()) {
641643
LOGGER.debug(
642644
"Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment",
@@ -906,9 +908,15 @@ synchronized void add(
906908
OffsetSpecification offsetSpecification,
907909
boolean isInitialSubscription) {
908910
if (this.isFull()) {
911+
LOGGER.debug(
912+
"Cannot add subscription tracker for stream '{}', manager is full",
913+
subscriptionTracker.stream);
909914
throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
910915
}
911916
if (this.isClosed()) {
917+
LOGGER.debug(
918+
"Cannot add subscription tracker for stream '{}', manager is closed",
919+
subscriptionTracker.stream);
912920
throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
913921
}
914922

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,10 @@ private ClientProducersManager(
599599
};
600600
ShutdownListener shutdownListener =
601601
shutdownContext -> {
602-
managers.remove(this);
602+
if (clientInitializedInManager.get()) {
603+
this.closed.set(true);
604+
managers.remove(this);
605+
}
603606
if (shutdownContext.isShutdownUnexpected()) {
604607
LOGGER.debug(
605608
"Recovering {} producer(s) after unexpected connection termination",

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.mockito.Captor;
6868
import org.mockito.Mock;
6969
import org.mockito.MockitoAnnotations;
70+
import org.mockito.stubbing.Answer;
7071

7172
public class ConsumersCoordinatorTest {
7273

@@ -1718,6 +1719,73 @@ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
17181719
}
17191720
}
17201721

1722+
@Test
1723+
void consumerShouldBeCreatedProperlyIfManagerClientIsRetried() {
1724+
scheduledExecutorService = createScheduledExecutorService();
1725+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
1726+
Duration retryDelay = Duration.ofMillis(100);
1727+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
1728+
when(consumer.isOpen()).thenReturn(true);
1729+
when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
1730+
1731+
when(clientFactory.client(any()))
1732+
.thenAnswer(
1733+
(Answer<Client>)
1734+
invocationOnMock -> {
1735+
// simulates the client is not the good one (e.g. because of load balancer),
1736+
// so the connection is closed (hence the call to the shutdown listener)
1737+
shutdownListener.handle(
1738+
new Client.ShutdownContext(
1739+
Client.ShutdownContext.ShutdownReason.CLIENT_CLOSE));
1740+
// and a client is returned
1741+
return client;
1742+
});
1743+
1744+
when(client.subscribe(
1745+
subscriptionIdCaptor.capture(),
1746+
anyString(),
1747+
any(OffsetSpecification.class),
1748+
anyInt(),
1749+
anyMap()))
1750+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
1751+
1752+
AtomicInteger messageHandlerCalls = new AtomicInteger();
1753+
Runnable closingRunnable =
1754+
coordinator.subscribe(
1755+
consumer,
1756+
"stream",
1757+
OffsetSpecification.first(),
1758+
null,
1759+
NO_OP_SUBSCRIPTION_LISTENER,
1760+
NO_OP_TRACKING_CLOSING_CALLBACK,
1761+
(offset, message) -> messageHandlerCalls.incrementAndGet(),
1762+
Collections.emptyMap(),
1763+
flowStrategy());
1764+
verify(clientFactory, times(1)).client(any());
1765+
verify(client, times(1))
1766+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
1767+
1768+
assertThat(messageHandlerCalls.get()).isEqualTo(0);
1769+
messageListener.handle(
1770+
subscriptionIdCaptor.getAllValues().get(0),
1771+
1,
1772+
0,
1773+
0,
1774+
null,
1775+
new WrapperMessageBuilder().build());
1776+
assertThat(messageHandlerCalls.get()).isEqualTo(1);
1777+
1778+
when(client.unsubscribe(subscriptionIdCaptor.getValue()))
1779+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
1780+
1781+
closingRunnable.run();
1782+
verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue());
1783+
1784+
messageListener.handle(
1785+
subscriptionIdCaptor.getValue(), 0, 0, 0, null, new WrapperMessageBuilder().build());
1786+
assertThat(messageHandlerCalls.get()).isEqualTo(1);
1787+
}
1788+
17211789
Client.Broker leader() {
17221790
return new Client.Broker("leader", -1);
17231791
}

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1617
import static com.rabbitmq.stream.impl.TestUtils.answer;
1718
import static com.rabbitmq.stream.impl.TestUtils.metadata;
1819
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +50,7 @@
4950
import org.junit.jupiter.api.Test;
5051
import org.mockito.Mock;
5152
import org.mockito.MockitoAnnotations;
53+
import org.mockito.stubbing.Answer;
5254

5355
public class ProducersCoordinatorTest {
5456

@@ -602,6 +604,41 @@ class TrackingConsumerInfo {
602604
assertThat(coordinator.clientCount()).isEqualTo(1);
603605
}
604606

607+
@Test
608+
void producerShouldBeCreatedProperlyIfManagerClientIsRetried() throws Exception {
609+
scheduledExecutorService = createScheduledExecutorService();
610+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
611+
Duration retryDelay = Duration.ofMillis(50);
612+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
613+
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
614+
615+
when(clientFactory.client(any()))
616+
.thenAnswer(
617+
(Answer<Client>)
618+
invocationOnMock -> {
619+
shutdownListener.handle(
620+
new Client.ShutdownContext(
621+
Client.ShutdownContext.ShutdownReason.CLIENT_CLOSE));
622+
623+
return client;
624+
})
625+
.thenReturn(client);
626+
627+
when(producer.isOpen()).thenReturn(true);
628+
when(trackingConsumer.isOpen()).thenReturn(true);
629+
630+
CountDownLatch setClientLatch = new CountDownLatch(1);
631+
doAnswer(answer(() -> setClientLatch.countDown())).when(producer).setClient(client);
632+
633+
coordinator.registerProducer(producer, null, "stream");
634+
635+
verify(producer, times(1)).setClient(client);
636+
assertThat(coordinator.nodesConnected()).isEqualTo(1);
637+
assertThat(coordinator.clientCount()).isEqualTo(1);
638+
639+
assertThat(setClientLatch).is(completed());
640+
}
641+
605642
private static ScheduledExecutorService createScheduledExecutorService() {
606643
return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor());
607644
}

0 commit comments

Comments
 (0)