Skip to content

Commit 1fdfe65

Browse files
authored
GH-2482: Option for Containers to Stop Immediately
Resolves #2482 `forceStop` means stop after the current record and requeue all prefetched. Just close the channel - canceling the consumer first causes a race condition which could allow another exclusive or single-active consumer to start processing while this container is still running. Also support async stop on DMLC (previously only available on the SMLC). **cherry-pick to 2.4.x**
1 parent 722fad8 commit 1fdfe65

File tree

7 files changed

+229
-56
lines changed

7 files changed

+229
-56
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,12 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene
136136

137137
private final Map<String, Object> consumerArgs = new HashMap<>();
138138

139-
private ContainerDelegate proxy = this.delegate;
140-
141139
private final AtomicBoolean logDeclarationException = new AtomicBoolean(true);
142140

141+
protected final AtomicBoolean stopNow = new AtomicBoolean(); // NOSONAR
142+
143+
private ContainerDelegate proxy = this.delegate;
144+
143145
private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
144146

145147
private ApplicationEventPublisher applicationEventPublisher;
@@ -245,6 +247,8 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene
245247
@Nullable
246248
private RabbitListenerObservationConvention observationConvention;
247249

250+
private boolean forceStop;
251+
248252
@Override
249253
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
250254
this.applicationEventPublisher = applicationEventPublisher;
@@ -1153,6 +1157,25 @@ protected MessageAckListener getMessageAckListener() {
11531157
return this.messageAckListener;
11541158
}
11551159

1160+
/**
1161+
* Stop container after current message(s) are processed and requeue any prefetched.
1162+
* @return true to stop when current message(s) are processed.
1163+
* @since 2.4.14
1164+
*/
1165+
protected boolean isForceStop() {
1166+
return this.forceStop;
1167+
}
1168+
1169+
/**
1170+
* Set to true to stop the container after the current message(s) are processed and
1171+
* requeue any prefetched. Useful when using exclusive or single-active consumers.
1172+
* @param forceStop true to stop when current messsage(s) are processed.
1173+
* @since 2.4.14
1174+
*/
1175+
public void setForceStop(boolean forceStop) {
1176+
this.forceStop = forceStop;
1177+
}
1178+
11561179
/**
11571180
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
11581181
*/
@@ -1302,7 +1325,21 @@ protected void setNotRunning() {
13021325
* A shared Rabbit Connection, if any, will automatically be closed <i>afterwards</i>.
13031326
* @see #shutdown()
13041327
*/
1305-
protected abstract void doShutdown();
1328+
protected void doShutdown() {
1329+
shutdownAndWaitOrCallback(null);
1330+
}
1331+
1332+
@Override
1333+
public void stop(Runnable callback) {
1334+
shutdownAndWaitOrCallback(() -> {
1335+
setNotRunning();
1336+
callback.run();
1337+
});
1338+
}
1339+
1340+
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
1341+
}
1342+
13061343

13071344
/**
13081345
* @return Whether this container is currently active, that is, whether it has been set up but not shut down yet.

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -784,11 +784,17 @@ public synchronized void stop() {
784784
if (logger.isDebugEnabled()) {
785785
logger.debug("Closing Rabbit Channel: " + this.channel);
786786
}
787-
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
788-
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
789-
this.deliveryTags.clear();
790-
this.consumers.clear();
791-
this.queue.clear(); // in case we still have a client thread blocked
787+
forceCloseAndClearQueue();
788+
}
789+
790+
public void forceCloseAndClearQueue() {
791+
if (this.channel != null && this.channel.isOpen()) {
792+
RabbitUtils.setPhysicalCloseRequired(this.channel, true);
793+
ConnectionFactoryUtils.releaseResources(this.resourceHolder);
794+
this.deliveryTags.clear();
795+
this.consumers.clear();
796+
this.queue.clear(); // in case we still have a client thread blocked
797+
}
792798
}
793799

794800
/**

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 61 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -813,7 +813,7 @@ else if (this.logger.isWarnEnabled()) {
813813
}
814814

815815
@Override
816-
protected void doShutdown() {
816+
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
817817
LinkedList<SimpleConsumer> canceledConsumers = null;
818818
boolean waitForConsumers = false;
819819
synchronized (this.consumersMonitor) {
@@ -826,44 +826,66 @@ protected void doShutdown() {
826826
}
827827
}
828828
if (waitForConsumers) {
829-
try {
830-
if (this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
831-
this.logger.info("Successfully waited for consumers to finish.");
832-
}
833-
else {
834-
this.logger.info("Consumers not finished.");
835-
if (isForceCloseChannel()) {
836-
canceledConsumers.forEach(consumer -> {
837-
String eventMessage = "Closing channel for unresponsive consumer: " + consumer;
838-
if (logger.isWarnEnabled()) {
839-
logger.warn(eventMessage);
840-
}
841-
consumer.cancelConsumer(eventMessage);
842-
});
829+
LinkedList<SimpleConsumer> consumersToWait = canceledConsumers;
830+
Runnable awaitShutdown = () -> {
831+
try {
832+
if (this.cancellationLock.await(getShutdownTimeout(), TimeUnit.MILLISECONDS)) {
833+
this.logger.info("Successfully waited for consumers to finish.");
834+
}
835+
else {
836+
this.logger.info("Consumers not finished.");
837+
if (isForceCloseChannel() || this.stopNow.get()) {
838+
consumersToWait.forEach(consumer -> {
839+
String eventMessage = "Closing channel for unresponsive consumer: " + consumer;
840+
if (logger.isWarnEnabled()) {
841+
logger.warn(eventMessage);
842+
}
843+
consumer.cancelConsumer(eventMessage);
844+
});
845+
}
843846
}
844847
}
848+
catch (InterruptedException e) {
849+
Thread.currentThread().interrupt();
850+
this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
851+
}
852+
finally {
853+
this.startedLatch = new CountDownLatch(1);
854+
this.started = false;
855+
this.aborted = false;
856+
this.hasStopped = true;
857+
}
858+
this.stopNow.set(false);
859+
runCallbackIfNotNull(callback);
860+
};
861+
if (callback == null) {
862+
awaitShutdown.run();
845863
}
846-
catch (InterruptedException e) {
847-
Thread.currentThread().interrupt();
848-
this.logger.warn("Interrupted waiting for consumers. Continuing with shutdown.");
849-
}
850-
finally {
851-
this.startedLatch = new CountDownLatch(1);
852-
this.started = false;
853-
this.aborted = false;
854-
this.hasStopped = true;
864+
else {
865+
getTaskExecutor().execute(awaitShutdown);
855866
}
856867
}
857868
}
858869

870+
private void runCallbackIfNotNull(@Nullable Runnable callback) {
871+
if (callback != null) {
872+
callback.run();
873+
}
874+
}
875+
859876
/**
860877
* Must hold this.consumersMonitor.
861878
* @param consumers a copy of this.consumers.
862879
*/
863880
private void actualShutDown(List<SimpleConsumer> consumers) {
864881
Assert.state(getTaskExecutor() != null, "Cannot shut down if not initialized");
865882
this.logger.debug("Shutting down");
866-
consumers.forEach(this::cancelConsumer);
883+
if (isForceStop()) {
884+
this.stopNow.set(true);
885+
}
886+
else {
887+
consumers.forEach(this::cancelConsumer);
888+
}
867889
this.consumers.clear();
868890
this.consumersByQueue.clear();
869891
this.logger.debug("All consumers canceled");
@@ -1031,6 +1053,10 @@ int incrementAndGetEpoch() {
10311053
public void handleDelivery(String consumerTag, Envelope envelope,
10321054
BasicProperties properties, byte[] body) {
10331055

1056+
if (!getChannel().isOpen()) {
1057+
this.logger.debug("Discarding prefetch, channel closed");
1058+
return;
1059+
}
10341060
MessageProperties messageProperties =
10351061
getMessagePropertiesConverter().toMessageProperties(properties, envelope, "UTF-8");
10361062
messageProperties.setConsumerTag(consumerTag);
@@ -1072,6 +1098,9 @@ public void handleDelivery(String consumerTag, Envelope envelope,
10721098
// NOSONAR
10731099
}
10741100
}
1101+
if (DirectMessageListenerContainer.this.stopNow.get()) {
1102+
closeChannel();
1103+
}
10751104
}
10761105

10771106
private void executeListenerInTransaction(Object data, long deliveryTag) {
@@ -1308,11 +1337,15 @@ void cancelConsumer(final String eventMessage) {
13081337
}
13091338

13101339
private void finalizeConsumer() {
1340+
closeChannel();
1341+
DirectMessageListenerContainer.this.cancellationLock.release(this);
1342+
consumerRemoved(this);
1343+
}
1344+
1345+
private void closeChannel() {
13111346
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
13121347
RabbitUtils.closeChannel(getChannel());
13131348
RabbitUtils.closeConnection(this.connection);
1314-
DirectMessageListenerContainer.this.cancellationLock.release(this);
1315-
consumerRemoved(this);
13161349
}
13171350

13181351
@Override

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -607,19 +607,7 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process
607607
}
608608

609609
@Override
610-
protected void doShutdown() {
611-
shutdownAndWaitOrCallback(null);
612-
}
613-
614-
@Override
615-
public void stop(Runnable callback) {
616-
shutdownAndWaitOrCallback(() -> {
617-
setNotRunning();
618-
callback.run();
619-
});
620-
}
621-
622-
private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
610+
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
623611
Thread thread = this.containerStoppingForAbort.get();
624612
if (thread != null && !thread.equals(Thread.currentThread())) {
625613
logger.info("Shutdown ignored - container is stopping due to an aborted consumer");
@@ -631,9 +619,14 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
631619
synchronized (this.consumersMonitor) {
632620
if (this.consumers != null) {
633621
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
622+
if (isForceStop()) {
623+
this.stopNow.set(true);
624+
}
634625
while (consumerIterator.hasNext()) {
635626
BlockingQueueConsumer consumer = consumerIterator.next();
636-
consumer.basicCancel(true);
627+
if (!isForceStop()) {
628+
consumer.basicCancel(true);
629+
}
637630
canceledConsumers.add(consumer);
638631
consumerIterator.remove();
639632
if (consumer.declaring) {
@@ -657,7 +650,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
657650
}
658651
else {
659652
logger.info("Workers not finished.");
660-
if (isForceCloseChannel()) {
653+
if (isForceCloseChannel() || this.stopNow.get()) {
661654
canceledConsumers.forEach(consumer -> {
662655
if (logger.isWarnEnabled()) {
663656
logger.warn("Closing channel for unresponsive consumer: " + consumer);
@@ -676,7 +669,7 @@ private void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
676669
this.consumers = null;
677670
this.cancellationLock.deactivate();
678671
}
679-
672+
this.stopNow.set(false);
680673
runCallbackIfNotNull(callback);
681674
};
682675
if (callback == null) {
@@ -1323,6 +1316,10 @@ public void run() { // NOSONAR - line count
13231316

13241317
private void mainLoop() throws Exception { // NOSONAR Exception
13251318
try {
1319+
if (SimpleMessageListenerContainer.this.stopNow.get()) {
1320+
this.consumer.forceCloseAndClearQueue();
1321+
return;
1322+
}
13261323
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
13271324
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
13281325
checkAdjust(receivedOk);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerIntegrationTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.amqp.core.Message;
5252
import org.springframework.amqp.core.MessageListener;
5353
import org.springframework.amqp.core.Queue;
54+
import org.springframework.amqp.core.QueueInformation;
5455
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
5556
import org.springframework.amqp.rabbit.connection.Connection;
5657
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -90,6 +91,7 @@
9091
*/
9192
@RabbitAvailable(queues = { DirectMessageListenerContainerIntegrationTests.Q1,
9293
DirectMessageListenerContainerIntegrationTests.Q2,
94+
DirectMessageListenerContainerIntegrationTests.Q3,
9395
DirectMessageListenerContainerIntegrationTests.EQ1,
9496
DirectMessageListenerContainerIntegrationTests.EQ2,
9597
DirectMessageListenerContainerIntegrationTests.DLQ1 })
@@ -102,6 +104,8 @@ public class DirectMessageListenerContainerIntegrationTests {
102104

103105
public static final String Q2 = "testQ2.DirectMessageListenerContainerIntegrationTests";
104106

107+
public static final String Q3 = "testQ3.DirectMessageListenerContainerIntegrationTests";
108+
105109
public static final String EQ1 = "eventTestQ1.DirectMessageListenerContainerIntegrationTests";
106110

107111
public static final String EQ2 = "eventTestQ2.DirectMessageListenerContainerIntegrationTests";
@@ -792,6 +796,48 @@ public void onMessage(Message message) {
792796
assertThat(ackDeliveryTag.get()).isEqualTo(1);
793797
}
794798

799+
@Test
800+
void forceStop() {
801+
CountDownLatch latch1 = new CountDownLatch(1);
802+
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
803+
DirectMessageListenerContainer container = new DirectMessageListenerContainer(cf);
804+
container.setMessageListener((ChannelAwareMessageListener) (msg, chan) -> {
805+
latch1.await(10, TimeUnit.SECONDS);
806+
});
807+
RabbitTemplate template = new RabbitTemplate(cf);
808+
try {
809+
container.setQueueNames(Q3);
810+
container.setForceStop(true);
811+
template.convertAndSend(Q3, "one");
812+
template.convertAndSend(Q3, "two");
813+
template.convertAndSend(Q3, "three");
814+
template.convertAndSend(Q3, "four");
815+
template.convertAndSend(Q3, "five");
816+
await().untilAsserted(() -> {
817+
QueueInformation queueInfo = admin.getQueueInfo(Q3);
818+
assertThat(queueInfo).isNotNull();
819+
assertThat(queueInfo.getMessageCount()).isEqualTo(5);
820+
});
821+
container.start();
822+
await().untilAsserted(() -> {
823+
QueueInformation queueInfo = admin.getQueueInfo(Q3);
824+
assertThat(queueInfo).isNotNull();
825+
assertThat(queueInfo.getMessageCount()).isEqualTo(0);
826+
});
827+
container.stop(() -> {
828+
});
829+
latch1.countDown();
830+
await().untilAsserted(() -> {
831+
QueueInformation queueInfo = admin.getQueueInfo(Q3);
832+
assertThat(queueInfo).isNotNull();
833+
assertThat(queueInfo.getMessageCount()).isEqualTo(4);
834+
});
835+
}
836+
finally {
837+
container.stop();
838+
}
839+
}
840+
795841
@Test
796842
public void testMessageAckListenerWithBatchAck() throws Exception {
797843
final AtomicInteger calledTimes = new AtomicInteger();

0 commit comments

Comments
 (0)