Skip to content

Commit 95631d8

Browse files
garyrussellartembilan
authored andcommitted
GH-2482: Fix Async Container Stop
- Stop with callback did not reset `active` flag (both containers) - DMLC did not release the `cancellationLock` - Also restore `@LongRunning` (cherry picked from commit d44a8d0)
1 parent a922a94 commit 95631d8

File tree

5 files changed

+27
-9
lines changed

5 files changed

+27
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1350,9 +1350,19 @@ public void initialize() {
13501350
}
13511351

13521352
/**
1353-
* Stop the shared Connection, call {@link #doShutdown()}, and close this container.
1353+
* Stop the shared Connection, call {@link #shutdown(Runnable)}, and close this
1354+
* container.
13541355
*/
13551356
public void shutdown() {
1357+
shutdown(null);
1358+
}
1359+
1360+
/**
1361+
* Stop the shared Connection, call {@link #shutdownAndWaitOrCallback(Runnable)}, and
1362+
* close this container.
1363+
* @param callback an optional {@link Runnable} to call when the stop is complete.
1364+
*/
1365+
public void shutdown(@Nullable Runnable callback) {
13561366
synchronized (this.lifecycleMonitor) {
13571367
if (!isActive()) {
13581368
logger.debug("Shutdown ignored - container is not active already");
@@ -1367,7 +1377,7 @@ public void shutdown() {
13671377

13681378
// Shut down the invokers.
13691379
try {
1370-
doShutdown();
1380+
shutdownAndWaitOrCallback(callback);
13711381
}
13721382
catch (Exception ex) {
13731383
throw convertRabbitAccessException(ex);
@@ -1405,10 +1415,7 @@ protected void doShutdown() {
14051415

14061416
@Override
14071417
public void stop(Runnable callback) {
1408-
shutdownAndWaitOrCallback(() -> {
1409-
setNotRunning();
1410-
callback.run();
1411-
});
1418+
shutdown(callback);
14121419
}
14131420

14141421
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1337,14 +1337,14 @@ void cancelConsumer(final String eventMessage) {
13371337

13381338
private void finalizeConsumer() {
13391339
closeChannel();
1340-
DirectMessageListenerContainer.this.cancellationLock.release(this);
13411340
consumerRemoved(this);
13421341
}
13431342

13441343
private void closeChannel() {
13451344
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
13461345
RabbitUtils.closeChannel(getChannel());
13471346
RabbitUtils.closeConnection(this.connection);
1347+
DirectMessageListenerContainer.this.cancellationLock.release(this);
13481348
}
13491349

13501350
@Override

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ public void onMessage(Message message) {
801801
}
802802

803803
@Test
804-
void forceStop() {
804+
void forceStop() throws InterruptedException {
805805
CountDownLatch latch1 = new CountDownLatch(1);
806806
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
807807
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
@@ -812,6 +812,7 @@ void forceStop() {
812812
try {
813813
container.setQueueNames(Q3);
814814
container.setForceStop(true);
815+
container.setShutdownTimeout(20_000L);
815816
template.convertAndSend(Q3, "one");
816817
template.convertAndSend(Q3, "two");
817818
template.convertAndSend(Q3, "three");
@@ -828,14 +829,21 @@ void forceStop() {
828829
assertThat(queueInfo).isNotNull();
829830
assertThat(queueInfo.getMessageCount()).isEqualTo(0);
830831
});
832+
CountDownLatch latch2 = new CountDownLatch(1);
833+
long t1 = System.currentTimeMillis();
831834
container.stop(() -> {
835+
latch2.countDown();
832836
});
833837
latch1.countDown();
838+
assertThat(System.currentTimeMillis() - t1).isLessThan(5_000L);
834839
await().untilAsserted(() -> {
835840
QueueInformation queueInfo = admin.getQueueInfo(Q3);
836841
assertThat(queueInfo).isNotNull();
837842
assertThat(queueInfo.getMessageCount()).isEqualTo(4);
838843
});
844+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
845+
assertThat(container.isActive()).isFalse();
846+
assertThat(container.isRunning()).isFalse();
839847
}
840848
finally {
841849
container.stop();

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegration2Tests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.amqp.rabbit.core.RabbitAdmin;
6868
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6969
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
70+
import org.springframework.amqp.rabbit.junit.LongRunning;
7071
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
7172
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
7273
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
@@ -95,7 +96,7 @@
9596
*/
9697
@RabbitAvailable(queues = { SimpleMessageListenerContainerIntegration2Tests.TEST_QUEUE,
9798
SimpleMessageListenerContainerIntegration2Tests.TEST_QUEUE_1 })
98-
//@LongRunning
99+
@LongRunning
99100
public class SimpleMessageListenerContainerIntegration2Tests {
100101

101102
public static final String TEST_QUEUE = "test.queue.SimpleMessageListenerContainerIntegration2Tests";

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ public void testCallbackIsRunOnStopAlsoWhenNoConsumerIsActive() throws Interrupt
437437
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
438438

439439
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
440+
ReflectionTestUtils.setField(container, "active", Boolean.TRUE);
440441

441442
final CountDownLatch countDownLatch = new CountDownLatch(1);
442443
container.stop(countDownLatch::countDown);
@@ -449,6 +450,7 @@ public void testCallbackIsRunOnStopAlsoWhenContainerIsStoppingForAbort() throws
449450

450451
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
451452
ReflectionTestUtils.setField(container, "containerStoppingForAbort", new AtomicReference<>(new Thread()));
453+
ReflectionTestUtils.setField(container, "active", Boolean.TRUE);
452454

453455
final CountDownLatch countDownLatch = new CountDownLatch(1);
454456
container.stop(countDownLatch::countDown);

0 commit comments

Comments
 (0)