Skip to content

Commit ef03b70

Browse files
garyrussellartembilan
authored andcommitted
GH-891: Docs for Multi RabbitMQ Support
Supplement to #1111
1 parent 0a0cfe4 commit ef03b70

File tree

2 files changed

+200
-43
lines changed

2 files changed

+200
-43
lines changed

src/reference/asciidoc/amqp.adoc

Lines changed: 196 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -636,12 +636,12 @@ The following examples shows how to configure a `SimpleRoutingConnectionFactory`
636636
----
637637
<bean id="connectionFactory"
638638
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
639-
<property name="targetConnectionFactories">
640-
<map>
641-
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
642-
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
643-
</map>
644-
</property>
639+
<property name="targetConnectionFactories">
640+
<map>
641+
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
642+
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
643+
</map>
644+
</property>
645645
</bean>
646646
647647
<rabbit:template id="template" connection-factory="connectionFactory" />
@@ -1476,7 +1476,7 @@ public static MessageBuilder fromClonedMessage(Message message) <5>
14761476
14771477
<1> The message created by the builder has a body that is a direct reference to the argument.
14781478
<2> The message created by the builder has a body that is a new array containing a copy of bytes in the argument.
1479-
<3> The message created by the builder has a body that is a new array containing the range of bytes from the argument.
1479+
<3> The message created by the builder has a body that is a new array containing the range of bytes from the argument.
14801480
See https://docs.oracle.com/javase/7/docs/api/java/util/Arrays.html[`Arrays.copyOfRange()`] for more details.
14811481
<4> The message created by the builder has a body that is a direct reference to the body of the argument.
14821482
The argument's properties are copied to a new `MessageProperties` object.
@@ -1554,11 +1554,11 @@ The following listing shows the `BatchingStrategy` interface definition:
15541554
----
15551555
public interface BatchingStrategy {
15561556
1557-
MessageBatch addToBatch(String exchange, String routingKey, Message message);
1557+
MessageBatch addToBatch(String exchange, String routingKey, Message message);
15581558
1559-
Date nextRelease();
1559+
Date nextRelease();
15601560
1561-
Collection<MessageBatch> releaseBatches();
1561+
Collection<MessageBatch> releaseBatches();
15621562
15631563
}
15641564
----
@@ -1652,22 +1652,22 @@ The following listing shows those method definitions:
16521652
[source,java]
16531653
----
16541654
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
1655-
throws AmqpException;
1655+
throws AmqpException;
16561656
16571657
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
1658-
throws AmqpException;
1658+
throws AmqpException;
16591659
16601660
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
1661-
String replyExchange, String replyRoutingKey) throws AmqpException;
1661+
String replyExchange, String replyRoutingKey) throws AmqpException;
16621662
16631663
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
1664-
String replyExchange, String replyRoutingKey) throws AmqpException;
1664+
String replyExchange, String replyRoutingKey) throws AmqpException;
16651665
16661666
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
1667-
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
1667+
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
16681668
16691669
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
1670-
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
1670+
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
16711671
----
16721672
====
16731673

@@ -1801,7 +1801,7 @@ The following listing shows the definition of `FunctionalInterface`:
18011801
@FunctionalInterface
18021802
public interface ReplyingMessageListener<T, R> {
18031803
1804-
R handleMessage(T t);
1804+
R handleMessage(T t);
18051805
18061806
}
18071807
----
@@ -4284,34 +4284,34 @@ The following listing shows sample client and server configurations:
42844284
[source,xml]
42854285
----
42864286
<bean id="client"
4287-
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
4288-
<property name="amqpTemplate" ref="template" />
4289-
<property name="serviceInterface" value="foo.ServiceInterface" />
4287+
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
4288+
<property name="amqpTemplate" ref="template" />
4289+
<property name="serviceInterface" value="foo.ServiceInterface" />
42904290
</bean>
42914291
42924292
<rabbit:connection-factory id="connectionFactory" />
42934293
42944294
<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"
4295-
routing-key="remoting.binding" exchange="remoting.exchange" />
4295+
routing-key="remoting.binding" exchange="remoting.exchange" />
42964296
42974297
<rabbit:admin connection-factory="connectionFactory" />
42984298
42994299
<rabbit:queue name="remoting.queue" />
43004300
43014301
<rabbit:direct-exchange name="remoting.exchange">
4302-
<rabbit:bindings>
4303-
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
4304-
</rabbit:bindings>
4302+
<rabbit:bindings>
4303+
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
4304+
</rabbit:bindings>
43054305
</rabbit:direct-exchange>
43064306
----
43074307
43084308
[source,xml]
43094309
----
43104310
<bean id="listener"
4311-
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
4312-
<property name="serviceInterface" value="foo.ServiceInterface" />
4313-
<property name="service" ref="service" />
4314-
<property name="amqpTemplate" ref="template" />
4311+
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
4312+
<property name="serviceInterface" value="foo.ServiceInterface" />
4313+
<property name="service" ref="service" />
4314+
<property name="amqpTemplate" ref="template" />
43154315
</bean>
43164316
43174317
<bean id="service" class="foo.ServiceImpl" />
@@ -4323,7 +4323,7 @@ The following listing shows sample client and server configurations:
43234323
<rabbit:queue name="remoting.queue" />
43244324
43254325
<rabbit:listener-container connection-factory="connectionFactory">
4326-
<rabbit:listener ref="listener" queue-names="remoting.queue" />
4326+
<rabbit:listener ref="listener" queue-names="remoting.queue" />
43274327
</rabbit:listener-container>
43284328
----
43294329
====
@@ -4790,17 +4790,17 @@ public static class Config {
47904790
47914791
@Bean
47924792
public DirectExchange e1() {
4793-
return new DirectExchange("e1", false, true);
4793+
return new DirectExchange("e1", false, true);
47944794
}
47954795
47964796
@Bean
47974797
public Queue q1() {
4798-
return new Queue("q1", false, false, true);
4798+
return new Queue("q1", false, false, true);
47994799
}
48004800
48014801
@Bean
48024802
public Binding b1() {
4803-
return BindingBuilder.bind(q1()).to(e1()).with("k1");
4803+
return BindingBuilder.bind(q1()).to(e1()).with("k1");
48044804
}
48054805
48064806
@Bean
@@ -4896,9 +4896,9 @@ The properties are available as attributes in the namespace, as shown in the fol
48964896
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
48974897
48984898
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
4899-
<rabbit:bindings>
4900-
<rabbit:binding key="foo" queue="bar"/>
4901-
</rabbit:bindings>
4899+
<rabbit:bindings>
4900+
<rabbit:binding key="foo" queue="bar"/>
4901+
</rabbit:bindings>
49024902
</rabbit:direct-exchange>
49034903
----
49044904
====
@@ -6149,10 +6149,10 @@ The following example shows how to do so:
61496149
----
61506150
@Bean
61516151
public StatefulRetryOperationsInterceptor interceptor() {
6152-
return RetryInterceptorBuilder.stateful()
6153-
.maxAttempts(5)
6154-
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
6155-
.build();
6152+
return RetryInterceptorBuilder.stateful()
6153+
.maxAttempts(5)
6154+
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
6155+
.build();
61566156
}
61576157
----
61586158
====
@@ -6220,10 +6220,10 @@ The following example shows how to set a `RepublishMessageRecoverer` as the reco
62206220
----
62216221
@Bean
62226222
RetryOperationsInterceptor interceptor() {
6223-
return RetryInterceptorBuilder.stateless()
6224-
.maxAttempts(5)
6225-
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
6226-
.build();
6223+
return RetryInterceptorBuilder.stateless()
6224+
.maxAttempts(5)
6225+
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
6226+
.build();
62276227
}
62286228
----
62296229
====
@@ -6271,6 +6271,159 @@ When `true`, it travers exception causes until it finds a match or there is no c
62716271

62726272
To use this classifier for retry, you can use a `SimpleRetryPolicy` created with the constructor that takes the max attempts, the `Map` of `Exception` instances, and the boolean (`traverseCauses`) and inject this policy into the `RetryTemplate`.
62736273

6274+
[[multi-rabbit]]
6275+
==== Multiple Broker (or Cluster) Support
6276+
6277+
Version 2.3 added more convenience when communicating between a single application and multiple brokers or broker clusters.
6278+
The main benefit, on the consumer side, is that the infrastructure can automatically associate auto-declared queues with the appropriate broker.
6279+
6280+
This is best illustrated with an example:
6281+
6282+
====
6283+
[source, java]
6284+
----
6285+
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
6286+
public class Application {
6287+
6288+
public static void main(String[] args) {
6289+
SpringApplication.run(Application.class, args);
6290+
}
6291+
6292+
@Bean
6293+
CachingConnectionFactory cf1() {
6294+
return new CachingConnectionFactory("localhost");
6295+
}
6296+
6297+
@Bean
6298+
CachingConnectionFactory cf2() {
6299+
return new CachingConnectionFactory("otherHost");
6300+
}
6301+
6302+
@Bean
6303+
CachingConnectionFactory cf3() {
6304+
return new CachingConnectionFactory("thirdHost");
6305+
}
6306+
6307+
@Bean
6308+
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
6309+
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {
6310+
6311+
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
6312+
rcf.setDefaultTargetConnectionFactory(cf1);
6313+
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
6314+
return rcf;
6315+
}
6316+
6317+
@Bean("factory1-admin")
6318+
RabbitAdmin admin1(CachingConnectionFactory cf1) {
6319+
return new RabbitAdmin(cf1);
6320+
}
6321+
6322+
@Bean("factory2-admin")
6323+
RabbitAdmin admin2(CachingConnectionFactory cf2) {
6324+
return new RabbitAdmin(cf2);
6325+
}
6326+
6327+
@Bean("factory3-admin")
6328+
RabbitAdmin admin3(CachingConnectionFactory cf3) {
6329+
return new RabbitAdmin(cf3);
6330+
}
6331+
6332+
@Bean
6333+
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
6334+
return new RabbitListenerEndpointRegistry();
6335+
}
6336+
6337+
@Bean
6338+
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
6339+
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
6340+
= new MultiRabbitListenerAnnotationBeanPostProcessor();
6341+
postProcessor.setEndpointRegistry(registry);
6342+
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
6343+
return postProcessor;
6344+
}
6345+
6346+
@Bean
6347+
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
6348+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
6349+
factory.setConnectionFactory(cf1);
6350+
return factory;
6351+
}
6352+
6353+
@Bean
6354+
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
6355+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
6356+
factory.setConnectionFactory(cf2);
6357+
return factory;
6358+
}
6359+
6360+
@Bean
6361+
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
6362+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
6363+
factory.setConnectionFactory(cf3);
6364+
return factory;
6365+
}
6366+
6367+
@Bean
6368+
RabbitTemplate template(RoutingConnectionFactory rcf) {
6369+
return new RabbitTemplate(rcf);
6370+
}
6371+
6372+
@Bean
6373+
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
6374+
return new ConnectionFactoryContextWrapper(rcf);
6375+
}
6376+
6377+
}
6378+
6379+
@Component
6380+
class Listeners {
6381+
6382+
@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
6383+
public void listen1(String in) {
6384+
6385+
}
6386+
6387+
@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
6388+
public void listen2(String in) {
6389+
6390+
}
6391+
6392+
@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
6393+
public void listen3(String in) {
6394+
6395+
}
6396+
6397+
}
6398+
----
6399+
====
6400+
6401+
As you can see, we have declared 3 sets of infrastructure (connection factories, admins, container factories).
6402+
As discussed earlier, `@RabbitListener` can define which container factory to use; in this case, they also use `queuesToDeclare` which causes the queue(s) to be declared on the broker, if it doesn't exist.
6403+
By naming the `RabbitAdmin` beans with the convention `<container-factory-name>-admin`, the infrastructure is able to determine which admin should declare the queue.
6404+
This will also work with `bindings = @QueueBinding(...)` whereby the exchange and binding will also be declared.
6405+
It will NOT work with `queues`, since that expects the queue(s) to already exist.
6406+
6407+
On the producer side, a convenient `ConnectionFactoryContextWrapper` class is provided, to make using the `RoutingConnectionFactory` (see <<routing-connection-factory>>) simpler.
6408+
6409+
As you can see above, a `SimpleRoutingConnectionFactory` bean has been added with routing keys `one`, `two` and `three`.
6410+
There is also a `RabbitTemplate` that uses that factory.
6411+
Here is an example of using that template with the wrapper to route to one of the broker clusters.
6412+
6413+
====
6414+
[source, java]
6415+
----
6416+
@Bean
6417+
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
6418+
return args -> {
6419+
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
6420+
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
6421+
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
6422+
};
6423+
}
6424+
----
6425+
====
6426+
62746427
==== Debugging
62756428

62766429
Spring AMQP provides extensive logging, especially at the `DEBUG` level.

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,7 @@ See <<containerAttributes>> for more information.
4848
The compressing `MessagePostProcessor` s now use a comma to separate multiple content encodings instead of a colon.
4949
The decompressors can handle both formats but, if you produce messages with this version that are consumed by versions earlier than 2.2.12, you should configure the compressor to use the old delimiter.
5050
See the IMPORTANT note in <<post-processing>> for more information.
51+
52+
==== Multiple Broker Support Improvements
53+
54+
See <<multi-rabbit>> for more information.

0 commit comments

Comments
 (0)