Skip to content

Commit 9a84951

Browse files
committed
GH-1246: SMLC: Fix addQueueNames
Resolves #1246 - called `queuesChanged()` twice - didn't wait for old consumers to exit, causing problems with exclusive consumers
1 parent 3a8e95f commit 9a84951

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,7 @@ public void setQueueNames(String... queueName) {
358358
*/
359359
@Override
360360
public void addQueueNames(String... queueName) {
361-
super.addQueueNames(queueName);
362-
queuesChanged();
361+
super.addQueueNames(queueName); // calls addQueues() which will cycle consumers
363362
}
364363

365364
/**
@@ -756,7 +755,13 @@ private void queuesChanged() {
756755
consumerIterator.remove();
757756
count++;
758757
}
759-
this.addAndStartConsumers(count);
758+
try {
759+
this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS);
760+
}
761+
catch (InterruptedException e) {
762+
Thread.currentThread().interrupt();
763+
}
764+
addAndStartConsumers(count);
760765
}
761766
}
762767
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,6 +30,7 @@
3030
import static org.junit.Assert.assertTrue;
3131
import static org.mockito.ArgumentMatchers.any;
3232
import static org.mockito.ArgumentMatchers.anyBoolean;
33+
import static org.mockito.BDDMockito.willAnswer;
3334
import static org.mockito.Mockito.atLeastOnce;
3435
import static org.mockito.Mockito.doReturn;
3536
import static org.mockito.Mockito.spy;
@@ -149,14 +150,21 @@ public void clear() throws Exception {
149150
@Test
150151
public void testChangeQueues() throws Exception {
151152
CountDownLatch latch = new CountDownLatch(30);
152-
container =
153-
createContainer(new MessageListenerAdapter(new PojoListener(latch)), queue.getName(), queue1.getName());
153+
AtomicInteger restarts = new AtomicInteger();
154+
container = spy(createContainer(new MessageListenerAdapter(new PojoListener(latch)), false, queue.getName(),
155+
queue1.getName()));
156+
willAnswer(invocation -> {
157+
restarts.incrementAndGet();
158+
invocation.callRealMethod();
159+
return null;
160+
}).given(container).addAndStartConsumers(1);
154161
final CountDownLatch consumerLatch = new CountDownLatch(1);
155162
this.container.setApplicationEventPublisher(e -> {
156163
if (e instanceof AsyncConsumerStoppedEvent) {
157164
consumerLatch.countDown();
158165
}
159166
});
167+
this.container.start();
160168
for (int i = 0; i < 10; i++) {
161169
template.convertAndSend(queue.getName(), i + "foo");
162170
template.convertAndSend(queue1.getName(), i + "foo");
@@ -170,6 +178,7 @@ public void testChangeQueues() throws Exception {
170178
assertTrue("Timed out waiting for message", waited);
171179
assertNull(template.receiveAndConvert(queue.getName()));
172180
assertNull(template.receiveAndConvert(queue1.getName()));
181+
assertThat(restarts.get(), equalTo(1));
173182
}
174183

175184
@Test

0 commit comments

Comments
 (0)