Skip to content

GH-891: Docs for Multi RabbitMQ Support #1262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 196 additions & 43 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,12 @@ The following examples shows how to configure a `SimpleRoutingConnectionFactory`
----
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
Expand Down Expand Up @@ -1476,7 +1476,7 @@ public static MessageBuilder fromClonedMessage(Message message) <5>

<1> The message created by the builder has a body that is a direct reference to the argument.
<2> The message created by the builder has a body that is a new array containing a copy of bytes in the argument.
<3> The message created by the builder has a body that is a new array containing the range of bytes from the argument.
<3> The message created by the builder has a body that is a new array containing the range of bytes from the argument.
See https://docs.oracle.com/javase/7/docs/api/java/util/Arrays.html[`Arrays.copyOfRange()`] for more details.
<4> The message created by the builder has a body that is a direct reference to the body of the argument.
The argument's properties are copied to a new `MessageProperties` object.
Expand Down Expand Up @@ -1554,11 +1554,11 @@ The following listing shows the `BatchingStrategy` interface definition:
----
public interface BatchingStrategy {

MessageBatch addToBatch(String exchange, String routingKey, Message message);
MessageBatch addToBatch(String exchange, String routingKey, Message message);

Date nextRelease();
Date nextRelease();

Collection<MessageBatch> releaseBatches();
Collection<MessageBatch> releaseBatches();

}
----
Expand Down Expand Up @@ -1652,22 +1652,22 @@ The following listing shows those method definitions:
[source,java]
----
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
----
====

Expand Down Expand Up @@ -1801,7 +1801,7 @@ The following listing shows the definition of `FunctionalInterface`:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

R handleMessage(T t);
R handleMessage(T t);

}
----
Expand Down Expand Up @@ -4284,34 +4284,34 @@ The following listing shows sample client and server configurations:
[source,xml]
----
<bean id="client"
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<property name="amqpTemplate" ref="template" />
<property name="serviceInterface" value="foo.ServiceInterface" />
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<property name="amqpTemplate" ref="template" />
<property name="serviceInterface" value="foo.ServiceInterface" />
</bean>

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"
routing-key="remoting.binding" exchange="remoting.exchange" />
routing-key="remoting.binding" exchange="remoting.exchange" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />

<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>
----

[source,xml]
----
<bean id="listener"
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
class="org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter">
<property name="serviceInterface" value="foo.ServiceInterface" />
<property name="service" ref="service" />
<property name="amqpTemplate" ref="template" />
</bean>

<bean id="service" class="foo.ServiceImpl" />
Expand All @@ -4323,7 +4323,7 @@ The following listing shows sample client and server configurations:
<rabbit:queue name="remoting.queue" />

<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
----
====
Expand Down Expand Up @@ -4790,17 +4790,17 @@ public static class Config {

@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
return new DirectExchange("e1", false, true);
}

@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
return new Queue("q1", false, false, true);
}

@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

@Bean
Expand Down Expand Up @@ -4896,9 +4896,9 @@ The properties are available as attributes in the namespace, as shown in the fol
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
----
====
Expand Down Expand Up @@ -6149,10 +6149,10 @@ The following example shows how to do so:
----
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
----
====
Expand Down Expand Up @@ -6220,10 +6220,10 @@ The following example shows how to set a `RepublishMessageRecoverer` as the reco
----
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
----
====
Expand Down Expand Up @@ -6271,6 +6271,159 @@ When `true`, it travers exception causes until it finds a match or there is no c

To use this classifier for retry, you can use a `SimpleRetryPolicy` created with the constructor that takes the max attempts, the `Map` of `Exception` instances, and the boolean (`traverseCauses`) and inject this policy into the `RetryTemplate`.

[[multi-rabbit]]
==== Multiple Broker (or Cluster) Support

Version 2.3 added more convenience when communicating between a single application and multiple brokers or broker clusters.
The main benefit, on the consumer side, is that the infrastructure can automatically associate auto-declared queues with the appropriate broker.

This is best illustrated with an example:

====
[source, java]
----
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}

@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}

@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}

@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {

SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}

@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}

@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}

@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}

@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}

@Bean
RabbitTemplate template(RoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}

@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}

}

@Component
class Listeners {

@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {

}

@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {

}

}
----
====

As you can see, we have declared 3 sets of infrastructure (connection factories, admins, container factories).
As discussed earlier, `@RabbitListener` can define which container factory to use; in this case, they also use `queuesToDeclare` which causes the queue(s) to be declared on the broker, if it doesn't exist.
By naming the `RabbitAdmin` beans with the convention `<container-factory-name>-admin`, the infrastructure is able to determine which admin should declare the queue.
This will also work with `bindings = @QueueBinding(...)` whereby the exchange and binding will also be declared.
It will NOT work with `queues`, since that expects the queue(s) to already exist.

On the producer side, a convenient `ConnectionFactoryContextWrapper` class is provided, to make using the `RoutingConnectionFactory` (see <<routing-connection-factory>>) simpler.

As you can see above, a `SimpleRoutingConnectionFactory` bean has been added with routing keys `one`, `two` and `three`.
There is also a `RabbitTemplate` that uses that factory.
Here is an example of using that template with the wrapper to route to one of the broker clusters.

====
[source, java]
----
@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}
----
====

==== Debugging

Spring AMQP provides extensive logging, especially at the `DEBUG` level.
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ See <<containerAttributes>> for more information.
The compressing `MessagePostProcessor` s now use a comma to separate multiple content encodings instead of a colon.
The decompressors can handle both formats but, if you produce messages with this version that are consumed by versions earlier than 2.2.12, you should configure the compressor to use the old delimiter.
See the IMPORTANT note in <<post-processing>> for more information.

==== Multiple Broker Support Improvements

See <<multi-rabbit>> for more information.