Skip to content

Commit 455c229

Browse files
committed
Fix coordinator manager creation in load balancer mode
It could go into an infinite loop if the connection was retried because it was not the expected node.
1 parent b41dd1b commit 455c229

File tree

4 files changed

+119
-3
lines changed

4 files changed

+119
-3
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,10 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
635635
};
636636
ShutdownListener shutdownListener =
637637
shutdownContext -> {
638-
this.closed.set(true);
639-
managers.remove(this);
638+
if (clientInitializedInManager.get()) {
639+
this.closed.set(true);
640+
managers.remove(this);
641+
}
640642
if (shutdownContext.isShutdownUnexpected()) {
641643
LOGGER.debug(
642644
"Unexpected shutdown notification on subscription connection {}, scheduling consumers re-assignment",
@@ -906,9 +908,15 @@ synchronized void add(
906908
OffsetSpecification offsetSpecification,
907909
boolean isInitialSubscription) {
908910
if (this.isFull()) {
911+
LOGGER.debug(
912+
"Cannot add subscription tracker for stream '{}', manager is full",
913+
subscriptionTracker.stream);
909914
throw new IllegalStateException("Cannot add subscription tracker, the manager is full");
910915
}
911916
if (this.isClosed()) {
917+
LOGGER.debug(
918+
"Cannot add subscription tracker for stream '{}', manager is closed",
919+
subscriptionTracker.stream);
912920
throw new IllegalStateException("Cannot add subscription tracker, the manager is closed");
913921
}
914922

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,10 @@ private ClientProducersManager(
599599
};
600600
ShutdownListener shutdownListener =
601601
shutdownContext -> {
602-
managers.remove(this);
602+
if (clientInitializedInManager.get()) {
603+
this.closed.set(true);
604+
managers.remove(this);
605+
}
603606
if (shutdownContext.isShutdownUnexpected()) {
604607
LOGGER.debug(
605608
"Recovering {} producer(s) after unexpected connection termination",

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.mockito.Captor;
6868
import org.mockito.Mock;
6969
import org.mockito.MockitoAnnotations;
70+
import org.mockito.stubbing.Answer;
7071

7172
public class ConsumersCoordinatorTest {
7273

@@ -1718,6 +1719,73 @@ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
17181719
}
17191720
}
17201721

1722+
@Test
1723+
void consumerShouldBeCreatedProperlyIfManagerClientIsRetried() {
1724+
scheduledExecutorService = createScheduledExecutorService();
1725+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
1726+
Duration retryDelay = Duration.ofMillis(100);
1727+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
1728+
when(consumer.isOpen()).thenReturn(true);
1729+
when(locator.metadata("stream")).thenReturn(metadata(null, replica()));
1730+
1731+
when(clientFactory.client(any()))
1732+
.thenAnswer(
1733+
(Answer<Client>)
1734+
invocationOnMock -> {
1735+
// simulates the client is not the good one (e.g. because of load balancer),
1736+
// so the connection is closed (hence the call to the shutdown listener)
1737+
shutdownListener.handle(
1738+
new Client.ShutdownContext(
1739+
Client.ShutdownContext.ShutdownReason.CLIENT_CLOSE));
1740+
// and a client is returned
1741+
return client;
1742+
});
1743+
1744+
when(client.subscribe(
1745+
subscriptionIdCaptor.capture(),
1746+
anyString(),
1747+
any(OffsetSpecification.class),
1748+
anyInt(),
1749+
anyMap()))
1750+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
1751+
1752+
AtomicInteger messageHandlerCalls = new AtomicInteger();
1753+
Runnable closingRunnable =
1754+
coordinator.subscribe(
1755+
consumer,
1756+
"stream",
1757+
OffsetSpecification.first(),
1758+
null,
1759+
NO_OP_SUBSCRIPTION_LISTENER,
1760+
NO_OP_TRACKING_CLOSING_CALLBACK,
1761+
(offset, message) -> messageHandlerCalls.incrementAndGet(),
1762+
Collections.emptyMap(),
1763+
flowStrategy());
1764+
verify(clientFactory, times(1)).client(any());
1765+
verify(client, times(1))
1766+
.subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap());
1767+
1768+
assertThat(messageHandlerCalls.get()).isEqualTo(0);
1769+
messageListener.handle(
1770+
subscriptionIdCaptor.getAllValues().get(0),
1771+
1,
1772+
0,
1773+
0,
1774+
null,
1775+
new WrapperMessageBuilder().build());
1776+
assertThat(messageHandlerCalls.get()).isEqualTo(1);
1777+
1778+
when(client.unsubscribe(subscriptionIdCaptor.getValue()))
1779+
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
1780+
1781+
closingRunnable.run();
1782+
verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue());
1783+
1784+
messageListener.handle(
1785+
subscriptionIdCaptor.getValue(), 0, 0, 0, null, new WrapperMessageBuilder().build());
1786+
assertThat(messageHandlerCalls.get()).isEqualTo(1);
1787+
}
1788+
17211789
Client.Broker leader() {
17221790
return new Client.Broker("leader", -1);
17231791
}

src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1617
import static com.rabbitmq.stream.impl.TestUtils.answer;
1718
import static com.rabbitmq.stream.impl.TestUtils.metadata;
1819
import static org.assertj.core.api.Assertions.assertThat;
@@ -49,6 +50,7 @@
4950
import org.junit.jupiter.api.Test;
5051
import org.mockito.Mock;
5152
import org.mockito.MockitoAnnotations;
53+
import org.mockito.stubbing.Answer;
5254

5355
public class ProducersCoordinatorTest {
5456

@@ -602,6 +604,41 @@ class TrackingConsumerInfo {
602604
assertThat(coordinator.clientCount()).isEqualTo(1);
603605
}
604606

607+
@Test
608+
void producerShouldBeCreatedProperlyIfManagerClientIsRetried() throws Exception {
609+
scheduledExecutorService = createScheduledExecutorService();
610+
when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService);
611+
Duration retryDelay = Duration.ofMillis(50);
612+
when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay));
613+
when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas()));
614+
615+
when(clientFactory.client(any()))
616+
.thenAnswer(
617+
(Answer<Client>)
618+
invocationOnMock -> {
619+
shutdownListener.handle(
620+
new Client.ShutdownContext(
621+
Client.ShutdownContext.ShutdownReason.CLIENT_CLOSE));
622+
623+
return client;
624+
})
625+
.thenReturn(client);
626+
627+
when(producer.isOpen()).thenReturn(true);
628+
when(trackingConsumer.isOpen()).thenReturn(true);
629+
630+
CountDownLatch setClientLatch = new CountDownLatch(1);
631+
doAnswer(answer(() -> setClientLatch.countDown())).when(producer).setClient(client);
632+
633+
coordinator.registerProducer(producer, null, "stream");
634+
635+
verify(producer, times(1)).setClient(client);
636+
assertThat(coordinator.nodesConnected()).isEqualTo(1);
637+
assertThat(coordinator.clientCount()).isEqualTo(1);
638+
639+
assertThat(setClientLatch).is(completed());
640+
}
641+
605642
private static ScheduledExecutorService createScheduledExecutorService() {
606643
return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor());
607644
}

0 commit comments

Comments
 (0)