Skip to content

Commit 664f470

Browse files
garyrussellartembilan
authored andcommitted
GH-1198: Support AddressResolver
Resolves #1198 **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java
1 parent bb3b8c8 commit 664f470

File tree

7 files changed

+113
-22
lines changed

7 files changed

+113
-22
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {
4242

4343
private static final String SHUFFLE_ADDRESSES = "shuffle-addresses";
4444

45+
private static final String ADDRESS_RESOLVER = "address-resolver";
46+
4547
private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host";
4648

4749
private static final String USER_ATTRIBUTE = "username";
@@ -101,6 +103,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
101103
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, EXECUTOR_ATTRIBUTE);
102104
NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES);
103105
NamespaceUtils.setValueIfAttributeDefined(builder, element, SHUFFLE_ADDRESSES);
106+
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, ADDRESS_RESOLVER);
104107
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_CONFIRMS);
105108
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_RETURNS);
106109
NamespaceUtils.setValueIfAttributeDefined(builder, element, REQUESTED_HEARTBEAT, "requestedHeartBeat");

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.util.StringUtils;
5252

5353
import com.rabbitmq.client.Address;
54+
import com.rabbitmq.client.AddressResolver;
5455
import com.rabbitmq.client.BlockedListener;
5556
import com.rabbitmq.client.Recoverable;
5657
import com.rabbitmq.client.RecoveryListener;
@@ -122,6 +123,8 @@ public void handleRecovery(Recoverable recoverable) {
122123

123124
private ApplicationEventPublisher applicationEventPublisher;
124125

126+
private AddressResolver addressResolver;
127+
125128
private volatile boolean contextStopped;
126129

127130
/**
@@ -217,6 +220,16 @@ public void setConnectionThreadFactory(ThreadFactory threadFactory) {
217220
this.rabbitConnectionFactory.setThreadFactory(threadFactory);
218221
}
219222

223+
/**
224+
* Set an {@link AddressResolver} to use when creating connections; overrides
225+
* {@link #setAddresses(String)}, {@link #setHost(String)}, and {@link #setPort(int)}.
226+
* @param addressResolver the resolver.
227+
* @since 2.1.15
228+
*/
229+
public void setAddressResolver(AddressResolver addressResolver) {
230+
this.addressResolver = addressResolver;
231+
}
232+
220233
/**
221234
* @param uri the URI
222235
* @since 1.5
@@ -292,7 +305,8 @@ public void setAddresses(String addresses) {
292305
return;
293306
}
294307
}
295-
this.logger.info("setAddresses() called with an empty value, will be using the host+port properties for connections");
308+
this.logger.info("setAddresses() called with an empty value, will be using the host+port "
309+
+ " or addressResolver properties for connections");
296310
this.addresses = null;
297311
}
298312

@@ -512,28 +526,47 @@ public void handleRecovery(Recoverable recoverable) {
512526
}
513527

514528
private com.rabbitmq.client.Connection connect(String connectionName) throws IOException, TimeoutException {
515-
com.rabbitmq.client.Connection rabbitConnection;
529+
if (this.addressResolver != null) {
530+
return connectResolver(connectionName);
531+
}
516532
if (this.addresses != null) {
517-
List<Address> addressesToConnect = this.addresses;
518-
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
519-
List<Address> list = new ArrayList<>(addressesToConnect);
520-
Collections.shuffle(list);
521-
addressesToConnect = list;
522-
}
523-
if (this.logger.isInfoEnabled()) {
524-
this.logger.info("Attempting to connect to: " + addressesToConnect);
525-
}
526-
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
527-
connectionName);
533+
return connectAddresses(connectionName);
528534
}
529535
else {
530-
if (this.logger.isInfoEnabled()) {
531-
this.logger.info("Attempting to connect to: " + this.rabbitConnectionFactory.getHost()
532-
+ ":" + this.rabbitConnectionFactory.getPort());
533-
}
534-
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
536+
return connectHostPort(connectionName);
537+
}
538+
}
539+
540+
private com.rabbitmq.client.Connection connectResolver(String connectionName) throws IOException, TimeoutException {
541+
if (this.logger.isInfoEnabled()) {
542+
this.logger.info("Attempting to connect with: " + this.addressResolver);
543+
}
544+
return this.rabbitConnectionFactory.newConnection(this.executorService, this.addressResolver,
545+
connectionName);
546+
}
547+
548+
private com.rabbitmq.client.Connection connectAddresses(String connectionName)
549+
throws IOException, TimeoutException {
550+
551+
List<Address> addressesToConnect = this.addresses;
552+
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
553+
List<Address> list = new ArrayList<>(addressesToConnect);
554+
Collections.shuffle(list);
555+
addressesToConnect = list;
556+
}
557+
if (this.logger.isInfoEnabled()) {
558+
this.logger.info("Attempting to connect to: " + addressesToConnect);
559+
}
560+
return this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
561+
connectionName);
562+
}
563+
564+
private com.rabbitmq.client.Connection connectHostPort(String connectionName) throws IOException, TimeoutException {
565+
if (this.logger.isInfoEnabled()) {
566+
this.logger.info("Attempting to connect to: " + this.rabbitConnectionFactory.getHost()
567+
+ ":" + this.rabbitConnectionFactory.getPort());
535568
}
536-
return rabbitConnection;
569+
return this.rabbitConnectionFactory.newConnection(this.executorService, connectionName);
537570
}
538571

539572
protected final String getDefaultHostName() {

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.2.xsd

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1408,6 +1408,18 @@
14081408
]]></xsd:documentation>
14091409
</xsd:annotation>
14101410
</xsd:attribute>
1411+
<xsd:attribute name="address-resolver" type="xsd:string" use="optional">
1412+
<xsd:annotation>
1413+
<xsd:documentation><![CDATA[
1414+
An address resolver bean; overrides 'addresses' and 'host/port'.
1415+
]]></xsd:documentation>
1416+
<xsd:appinfo>
1417+
<tool:annotation kind="ref">
1418+
<tool:expected-type type="com.rabbitmq.client.AddressResolver" />
1419+
</tool:annotation>
1420+
</xsd:appinfo>
1421+
</xsd:annotation>
1422+
</xsd:attribute>
14111423
<xsd:attribute name="username" type="xsd:string" use="optional">
14121424
<xsd:annotation>
14131425
<xsd:documentation><![CDATA[

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -128,4 +128,10 @@ public void testMultiHost() throws Exception {
128128
"rabbitConnectionFactory.threadFactory")).isSameAs(beanFactory.getBean("tf"));
129129
}
130130

131+
@Test
132+
void testResolver() {
133+
CachingConnectionFactory connectionFactory = beanFactory.getBean("resolved", CachingConnectionFactory.class);
134+
assertThat(TestUtils.getPropertyValue(connectionFactory, "addressResolver"))
135+
.isSameAs(this.beanFactory.getBean("resolver"));
136+
}
131137
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactoryTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.springframework.test.util.ReflectionTestUtils;
8282

8383
import com.rabbitmq.client.Address;
84+
import com.rabbitmq.client.AddressResolver;
8485
import com.rabbitmq.client.Channel;
8586
import com.rabbitmq.client.ConfirmListener;
8687
import com.rabbitmq.client.ConnectionFactory;
@@ -1876,4 +1877,27 @@ public void confirmsCorrelated() {
18761877
assertThat(cf.getPublisherConnectionFactory().isPublisherConfirms()).isFalse();
18771878
}
18781879

1880+
@Test
1881+
void testResolver() throws Exception {
1882+
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
1883+
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
1884+
Channel mockChannel = mock(Channel.class);
1885+
1886+
AddressResolver resolver = () -> Collections.singletonList(Address.parseAddress("foo:5672"));
1887+
when(mockConnectionFactory.newConnection(any(ExecutorService.class), eq(resolver), anyString()))
1888+
.thenReturn(mockConnection);
1889+
when(mockConnection.createChannel()).thenReturn(mockChannel);
1890+
when(mockChannel.isOpen()).thenReturn(true);
1891+
when(mockConnection.isOpen()).thenReturn(true);
1892+
1893+
CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
1894+
ccf.setExecutor(mock(ExecutorService.class));
1895+
ccf.setAddressResolver(resolver);
1896+
Connection con = ccf.createConnection();
1897+
assertThat(con).isNotNull();
1898+
assertThat(TestUtils.getPropertyValue(con, "target", SimpleConnection.class).getDelegate())
1899+
.isEqualTo(mockConnection);
1900+
verify(mockConnectionFactory).newConnection(any(ExecutorService.class), eq(resolver), anyString());
1901+
}
1902+
18791903
}

spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ConnectionFactoryParserTests-context.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@
2020

2121
<rabbit:connection-factory id="native" connection-factory="connectionFactory" channel-cache-size="10" />
2222

23-
<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory"/>
23+
24+
<rabbit:connection-factory id="resolved" connection-factory="connectionFactory"
25+
address-resolver="resolver"/>
26+
27+
<bean id="resolver" class="com.rabbitmq.client.ListAddressResolver">
28+
<constructor-arg value="null"/>
29+
</bean>
30+
31+
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean"/>
2432

2533
<rabbit:connection-factory id="withExecutor" host="foo" virtual-host="/bar"
2634
connection-cache-size="10" port="6888" username="user" password="password"

src/reference/asciidoc/amqp.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,11 @@ The following example with a custom thread factory that prefixes thread names wi
378378
----
379379
====
380380

381+
===== AddressResolver
382+
383+
Starting with version 2.1.15, you can now use an `AddressResover` to resolve the connection address(es).
384+
This will override any settings of the `addresses` and `host/port` properties.
385+
381386
===== Naming Connections
382387

383388
Starting with version 1.7, a `ConnectionNameStrategy` is provided for the injection into the `AbstractionConnectionFactory`.

0 commit comments

Comments
 (0)