Skip to content

Commit fa94439

Browse files
authored
GH-2490: Add forceStop to Container Factories
Resolves #2490 **cherry-pick to 2.4.x**
1 parent 58448ad commit fa94439

File tree

2 files changed

+19
-2
lines changed

2 files changed

+19
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM
117117

118118
private RabbitListenerObservationConvention observationConvention;
119119

120+
private Boolean forceStop;
121+
120122
/**
121123
* @param connectionFactory The connection factory.
122124
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
@@ -339,6 +341,16 @@ public void setObservationConvention(RabbitListenerObservationConvention observa
339341
this.observationConvention = observationConvention;
340342
}
341343

344+
/**
345+
* Set to true to stop the container after the current message(s) are processed and
346+
* requeue any prefetched. Useful when using exclusive or single-active consumers.
347+
* @param forceStop true to stop when current messsage(s) are processed.
348+
* @since 2.4.15
349+
*/
350+
public void setForceStop(boolean forceStop) {
351+
this.forceStop = forceStop;
352+
}
353+
342354
@Override
343355
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
344356
C instance = createContainerInstance();
@@ -376,7 +388,8 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
376388
.acceptIfNotNull(this.batchingStrategy, instance::setBatchingStrategy)
377389
.acceptIfNotNull(getMicrometerEnabled(), instance::setMicrometerEnabled)
378390
.acceptIfNotNull(getObservationEnabled(), instance::setObservationEnabled)
379-
.acceptIfNotNull(this.observationConvention, instance::setObservationConvention);
391+
.acceptIfNotNull(this.observationConvention, instance::setObservationConvention)
392+
.acceptIfNotNull(this.forceStop, instance::setForceStop);
380393
if (this.batchListener && this.deBatchingEnabled == null) {
381394
// turn off container debatching by default for batch listeners
382395
instance.setDeBatchingEnabled(false);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -114,6 +114,7 @@ public void createContainerFullConfig() {
114114
this.factory.setAfterReceivePostProcessors(afterReceivePostProcessor);
115115
this.factory.setGlobalQos(true);
116116
this.factory.setContainerCustomizer(c -> c.setShutdownTimeout(10_000));
117+
this.factory.setForceStop(true);
117118

118119
assertThat(this.factory.getAdviceChain()).isEqualTo(new Advice[]{advice});
119120

@@ -150,6 +151,7 @@ public void createContainerFullConfig() {
150151
assertThat(actualAfterReceivePostProcessors.size()).as("Wrong number of afterReceivePostProcessors").isEqualTo(1);
151152
assertThat(actualAfterReceivePostProcessors.get(0)).as("Wrong advice").isSameAs(afterReceivePostProcessor);
152153
assertThat(fieldAccessor.getPropertyValue("globalQos")).isEqualTo(true);
154+
assertThat(TestUtils.getPropertyValue(container, "forceStop", Boolean.class)).isTrue();
153155
}
154156

155157
@Test
@@ -176,6 +178,7 @@ public void createDirectContainerFullConfig() {
176178
this.direct.setMessagesPerAck(5);
177179
this.direct.setAckTimeout(3L);
178180
this.direct.setAfterReceivePostProcessors(afterReceivePostProcessor);
181+
this.direct.setForceStop(true);
179182

180183
assertThat(this.direct.getAdviceChain()).isEqualTo(new Advice[]{advice});
181184

@@ -207,6 +210,7 @@ public void createDirectContainerFullConfig() {
207210
List<?> actualAfterReceivePostProcessors = (List<?>) fieldAccessor.getPropertyValue("afterReceivePostProcessors");
208211
assertThat(actualAfterReceivePostProcessors.size()).as("Wrong number of afterReceivePostProcessors").isEqualTo(1);
209212
assertThat(actualAfterReceivePostProcessors.get(0)).as("Wrong afterReceivePostProcessor").isSameAs(afterReceivePostProcessor);
213+
assertThat(TestUtils.getPropertyValue(container, "forceStop", Boolean.class)).isTrue();
210214
}
211215

212216
private void setBasicConfig(AbstractRabbitListenerContainerFactory<?> factory) {

0 commit comments

Comments
 (0)