Skip to content

Commit b860606

Browse files
authored
GH-2504: Add SmartLifecycle to Conn. Factories
Resolves #2504
1 parent 0817b9a commit b860606

File tree

6 files changed

+121
-17
lines changed

6 files changed

+121
-17
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.amqp.AmqpTimeoutException;
5151
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
5252
import org.springframework.beans.factory.InitializingBean;
53+
import org.springframework.context.SmartLifecycle;
5354
import org.springframework.jmx.export.annotation.ManagedAttribute;
5455
import org.springframework.jmx.export.annotation.ManagedResource;
5556
import org.springframework.lang.Nullable;
@@ -99,7 +100,7 @@
99100
*/
100101
@ManagedResource
101102
public class CachingConnectionFactory extends AbstractConnectionFactory
102-
implements InitializingBean, ShutdownListener {
103+
implements InitializingBean, ShutdownListener, SmartLifecycle {
103104

104105
private static final String UNUSED = "unused";
105106

@@ -190,6 +191,8 @@ public enum ConfirmType {
190191

191192
private final ActiveObjectCounter<Channel> inFlightAsyncCloses = new ActiveObjectCounter<>();
192193

194+
private final AtomicBoolean running = new AtomicBoolean();
195+
193196
private long channelCheckoutTimeout = 0;
194197

195198
private CacheMode cacheMode = CacheMode.CHANNEL;
@@ -446,6 +449,11 @@ public void setPublisherChannelFactory(PublisherCallbackChannelFactory publisher
446449
this.publisherChannelFactory = publisherChannelFactory;
447450
}
448451

452+
@Override
453+
public int getPhase() {
454+
return Integer.MIN_VALUE;
455+
}
456+
449457
@Override
450458
public void afterPropertiesSet() {
451459
this.initialized = true;
@@ -459,6 +467,22 @@ public void afterPropertiesSet() {
459467
}
460468
}
461469

470+
@Override
471+
public void start() {
472+
this.running.set(true);
473+
}
474+
475+
@Override
476+
public void stop() {
477+
this.running.set(false);
478+
resetConnection();
479+
}
480+
481+
@Override
482+
public boolean isRunning() {
483+
return this.running.get();
484+
}
485+
462486
private void initCacheWaterMarks() {
463487
this.channelHighWaterMarks.put(ObjectUtils.getIdentityHexString(this.cachedChannelsNonTransactional),
464488
new AtomicInteger());

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicBoolean;
2324
import java.util.stream.Collectors;
2425
import java.util.stream.IntStream;
2526

@@ -29,6 +30,7 @@
2930
import org.springframework.amqp.AmqpException;
3031
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
3132
import org.springframework.beans.factory.DisposableBean;
33+
import org.springframework.context.SmartLifecycle;
3234
import org.springframework.core.io.Resource;
3335
import org.springframework.lang.Nullable;
3436
import org.springframework.util.Assert;
@@ -49,7 +51,8 @@
4951
* @author Gary Russell
5052
* @since 1.2
5153
*/
52-
public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean {
54+
public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean,
55+
SmartLifecycle {
5356

5457
private final Log logger = LogFactory.getLog(getClass());
5558

@@ -79,6 +82,8 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi
7982

8083
private final String trustStorePassPhrase;
8184

85+
private final AtomicBoolean running = new AtomicBoolean();
86+
8287
private NodeLocator<?> nodeLocator;
8388

8489
/**
@@ -233,6 +238,27 @@ public String getUsername() {
233238
return this.username;
234239
}
235240

241+
@Override
242+
public int getPhase() {
243+
return Integer.MIN_VALUE;
244+
}
245+
246+
@Override
247+
public void start() {
248+
this.running.set(true);
249+
}
250+
251+
@Override
252+
public void stop() {
253+
this.running.set(false);
254+
resetConnection();
255+
}
256+
257+
@Override
258+
public boolean isRunning() {
259+
return this.running.get();
260+
}
261+
236262
@Override
237263
public void addConnectionListener(ConnectionListener listener) {
238264
this.defaultConnectionFactory.addConnectionListener(listener);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/PooledChannelConnectionFactory.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
3737
import org.springframework.aop.framework.ProxyFactory;
3838
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
39+
import org.springframework.context.SmartLifecycle;
3940
import org.springframework.lang.Nullable;
4041
import org.springframework.util.Assert;
4142

@@ -54,7 +55,10 @@
5455
* @since 2.3
5556
*
5657
*/
57-
public class PooledChannelConnectionFactory extends AbstractConnectionFactory implements ShutdownListener {
58+
public class PooledChannelConnectionFactory extends AbstractConnectionFactory
59+
implements ShutdownListener, SmartLifecycle {
60+
61+
private final AtomicBoolean running = new AtomicBoolean();
5862

5963
private volatile ConnectionWrapper connection;
6064

@@ -132,6 +136,27 @@ public void addConnectionListener(ConnectionListener listener) {
132136
}
133137
}
134138

139+
@Override
140+
public int getPhase() {
141+
return Integer.MIN_VALUE;
142+
}
143+
144+
@Override
145+
public void start() {
146+
this.running.set(true);
147+
}
148+
149+
@Override
150+
public void stop() {
151+
this.running.set(false);
152+
resetConnection();
153+
}
154+
155+
@Override
156+
public boolean isRunning() {
157+
return this.running.get();
158+
}
159+
135160
@Override
136161
public synchronized Connection createConnection() throws AmqpException {
137162
if (this.connection == null || !this.connection.isOpen()) {
@@ -180,16 +205,20 @@ private static final class ConnectionWrapper extends SimpleConnection {
180205
BiConsumer<GenericObjectPool<Channel>, Boolean> configurer, ChannelListener channelListener) {
181206

182207
super(delegate, closeTimeout);
183-
GenericObjectPool<Channel> pool = new GenericObjectPool<>(new ChannelFactory());
184-
configurer.accept(pool, false);
185-
this.channels = pool;
186-
pool = new GenericObjectPool<>(new TxChannelFactory());
187-
configurer.accept(pool, true);
188-
this.txChannels = pool;
208+
this.channels = createPool(new ChannelFactory(), configurer, false);
209+
this.txChannels = createPool(new TxChannelFactory(), configurer, true);
189210
this.simplePublisherConfirms = simplePublisherConfirms;
190211
this.channelListener = channelListener;
191212
}
192213

214+
private GenericObjectPool<Channel> createPool(ChannelFactory channelFactory,
215+
BiConsumer<GenericObjectPool<Channel>, Boolean> configurer, boolean tx) {
216+
217+
GenericObjectPool<Channel> pool = new GenericObjectPool<>(channelFactory);
218+
configurer.accept(pool, tx);
219+
return pool;
220+
}
221+
193222
@Override
194223
public Channel createChannel(boolean transactional) {
195224
try {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ThreadChannelConnectionFactory.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
3232
import org.springframework.aop.framework.ProxyFactory;
3333
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
34+
import org.springframework.context.SmartLifecycle;
3435
import org.springframework.lang.Nullable;
3536
import org.springframework.util.Assert;
3637

@@ -48,12 +49,15 @@
4849
* @since 2.3
4950
*
5051
*/
51-
public class ThreadChannelConnectionFactory extends AbstractConnectionFactory implements ShutdownListener {
52+
public class ThreadChannelConnectionFactory extends AbstractConnectionFactory
53+
implements ShutdownListener, SmartLifecycle {
5254

5355
private final Map<UUID, Context> contextSwitches = new ConcurrentHashMap<>();
5456

5557
private final Map<UUID, Thread> switchesInProgress = new ConcurrentHashMap<>();
5658

59+
private final AtomicBoolean running = new AtomicBoolean();
60+
5761
private volatile ConnectionWrapper connection;
5862

5963
private boolean simplePublisherConfirms;
@@ -106,6 +110,27 @@ public void setSimplePublisherConfirms(boolean simplePublisherConfirms) {
106110
}
107111
}
108112

113+
@Override
114+
public int getPhase() {
115+
return Integer.MIN_VALUE;
116+
}
117+
118+
@Override
119+
public void start() {
120+
this.running.set(true);
121+
}
122+
123+
@Override
124+
public void stop() {
125+
this.running.set(false);
126+
resetConnection();
127+
}
128+
129+
@Override
130+
public boolean isRunning() {
131+
return this.running.get();
132+
}
133+
109134
@Override
110135
public void addConnectionListener(ConnectionListener listener) {
111136
super.addConnectionListener(listener); // handles publishing sub-factory

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -291,7 +291,7 @@ public void testSendAndReceiveFromVolatileQueueAfterImplicitRemoval() throws Exc
291291
template.convertAndSend(queue.getName(), "message");
292292

293293
// Force a physical close of the channel
294-
this.connectionFactory.resetConnection();
294+
this.connectionFactory.stop();
295295

296296
// The queue was removed when the channel was closed
297297
assertThatThrownBy(() -> template.receiveAndConvert(queue.getName()))

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/BrokerDeclaredQueueNameTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -124,7 +124,7 @@ private void testBrokerNamedQueue(AbstractMessageListenerContainer container,
124124
assertThat(this.message.get().getBody()).isEqualTo("foo".getBytes());
125125
final CountDownLatch newConnectionLatch = new CountDownLatch(2);
126126
this.cf.addConnectionListener(c -> newConnectionLatch.countDown());
127-
this.cf.resetConnection();
127+
this.cf.stop();
128128
assertThat(newConnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
129129
String secondActualName = queue.getActualName();
130130
assertThat(secondActualName).isNotEqualTo(firstActualName);

0 commit comments

Comments
 (0)