Skip to content

GH-891 Introduced MultiRabbit to handle multiple brokers #1111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 24, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,7 @@ public void setIgnoreDeclarationExceptions(boolean ignoreDeclarationExceptions)
this.ignoreDeclarationExceptions = ignoreDeclarationExceptions;
}

/**
* The {@code AmqpAdmin}s that should declare this object; default is
* all admins.
* <br><br>A null argument, or an array/varArg with a single null argument, clears the collection
* ({@code setAdminsThatShouldDeclare((AmqpAdmin) null)} or
* {@code setAdminsThatShouldDeclare((AmqpAdmin[]) null)}). Clearing the collection resets
* the behavior such that all admins will declare the object.
* @param adminArgs The admins.
*/
@Override
public void setAdminsThatShouldDeclare(Object... adminArgs) {
Collection<Object> admins = new ArrayList<Object>();
if (adminArgs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ public interface Declarable {
*/
boolean isIgnoreDeclarationExceptions();

/**
* The {@code AmqpAdmin}s that should declare this object; default is
* all admins.
* <br><br>A null argument, or an array/varArg with a single null argument, clears the collection
* ({@code setAdminsThatShouldDeclare((AmqpAdmin) null)} or
* {@code setAdminsThatShouldDeclare((AmqpAdmin[]) null)}). Clearing the collection resets
* the behavior such that all admins will declare the object.
* @param adminArgs The admins.
*/
void setAdminsThatShouldDeclare(Object... adminArgs);

/**
* Add an argument to the declarable.
* @param name the argument name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.amqp.rabbit.test;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand All @@ -30,6 +31,7 @@
import org.mockito.Mockito;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
Expand Down Expand Up @@ -73,8 +75,8 @@ public RabbitListenerTestHarness(AnnotationMetadata importMetadata) {
}

@Override
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {
protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint endpoint,
RabbitListener rabbitListener, Object bean, Object target, String beanName) {

Object proxy = bean;
String id = rabbitListener.id();
Expand Down Expand Up @@ -102,7 +104,7 @@ protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitList
else {
logger.info("The test harness can only proxy @RabbitListeners with an 'id' attribute");
}
super.processListener(endpoint, rabbitListener, proxy, target, beanName); // NOSONAR proxy is not null
return super.processListener(endpoint, rabbitListener, proxy, target, beanName); // NOSONAR proxy is not null
}

public InvocationData getNextInvocationDataFor(String id, long wait, TimeUnit unit) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.annotation;

import java.lang.reflect.Method;
import java.util.Collection;

import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.util.StringUtils;

/**
* An extension of {@link RabbitListenerAnnotationBeanPostProcessor} that associates the
* proper RabbitAdmin to the beans of Exchanges, Queues, and Bindings after they are
* created.
* <p>
* This processing restricts the {@link RabbitAdmin} according to the related
* configuration, preventing the server from automatic binding non-related structures.
*
* @author Wander Costa
*/
public class MultiRabbitListenerAnnotationBeanPostProcessor extends RabbitListenerAnnotationBeanPostProcessor {

public static final String CONNECTION_FACTORY_BEAN_NAME = "multiRabbitConnectionFactory";

public static final String CONNECTION_FACTORY_CREATOR_BEAN_NAME = "rabbitConnectionFactoryCreator";

private static final String DEFAULT_RABBIT_ADMIN_BEAN_NAME = "defaultRabbitAdmin";

private static final String RABBIT_ADMIN_SUFFIX = "-admin";

@Override
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method,
Object bean, String beanName) {
final Collection<Declarable> declarables = super.processAmqpListener(rabbitListener, method, bean, beanName);
final String rabbitAdmin = resolveMultiRabbitAdminName(rabbitListener);
for (final Declarable declarable : declarables) {
if (declarable.getDeclaringAdmins().isEmpty()) {
declarable.setAdminsThatShouldDeclare(rabbitAdmin);
}
}
return declarables;
}

/**
* Resolves the name of the RabbitAdmin bean based on the RabbitListener, or falls back to
* the default RabbitAdmin name provided by MultiRabbit.
*
* @param rabbitListener The RabbitListener to process the name from.
* @return The name of the RabbitAdmin bean.
*/
protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
String admin = super.resolveExpressionAsString(rabbitListener.admin(), "admin");
if (!StringUtils.hasText(admin) && StringUtils.hasText(rabbitListener.containerFactory())) {
admin = rabbitListener.containerFactory()
+ MultiRabbitListenerAnnotationBeanPostProcessor.RABBIT_ADMIN_SUFFIX;
}
if (!StringUtils.hasText(admin)) {
admin = MultiRabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_ADMIN_BEAN_NAME;
}
return admin;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.amqp.core.Base64UrlNamingStrategy;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.Declarable;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Queue;
Expand Down Expand Up @@ -362,11 +363,12 @@ private void processMultiMethodListeners(RabbitListener[] classLevelListeners, M
}
}

protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
protected Collection<Declarable> processAmqpListener(RabbitListener rabbitListener, Method method, Object bean,
String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
return processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}

private Method checkProxy(Method methodArg, Object bean) {
Expand Down Expand Up @@ -401,13 +403,14 @@ private Method checkProxy(Method methodArg, Object bean) {
return method;
}

protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object target, String beanName) {
protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint endpoint,
RabbitListener rabbitListener, Object bean, Object target, String beanName) {

final List<Declarable> declarables = new ArrayList<>();
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener));
endpoint.setQueueNames(resolveQueues(rabbitListener, declarables));
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
Expand Down Expand Up @@ -456,6 +459,7 @@ else if (errorHandler instanceof String) {
RabbitListenerContainerFactory<?> factory = resolveContainerFactory(rabbitListener, target, beanName);

this.registrar.registerEndpoint(endpoint, factory);
return declarables;
}

private void resolveAckMode(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener) {
Expand Down Expand Up @@ -562,7 +566,7 @@ private String getEndpointId(RabbitListener rabbitListener) {
}
}

private String[] resolveQueues(RabbitListener rabbitListener) {
private String[] resolveQueues(RabbitListener rabbitListener, Collection<Declarable> declarables) {
String[] queues = rabbitListener.queues();
QueueBinding[] bindings = rabbitListener.bindings();
org.springframework.amqp.rabbit.annotation.Queue[] queuesToDeclare = rabbitListener.queuesToDeclare();
Expand All @@ -578,15 +582,15 @@ private String[] resolveQueues(RabbitListener rabbitListener) {
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
}
for (int i = 0; i < queuesToDeclare.length; i++) {
result.add(declareQueue(queuesToDeclare[i]));
result.add(declareQueue(queuesToDeclare[i], declarables));
}
}
if (bindings.length > 0) {
if (queues.length > 0 || queuesToDeclare.length > 0) {
throw new BeanInitializationException(
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
}
return registerBeansForDeclaration(rabbitListener);
return registerBeansForDeclaration(rabbitListener, declarables);
}
return result.toArray(new String[result.size()]);
}
Expand Down Expand Up @@ -618,19 +622,20 @@ else if (resolvedValueToUse instanceof Iterable) {
}
}

private String[] registerBeansForDeclaration(RabbitListener rabbitListener) {
private String[] registerBeansForDeclaration(RabbitListener rabbitListener, Collection<Declarable> declarables) {
List<String> queues = new ArrayList<String>();
if (this.beanFactory instanceof ConfigurableBeanFactory) {
for (QueueBinding binding : rabbitListener.bindings()) {
String queueName = declareQueue(binding.value());
String queueName = declareQueue(binding.value(), declarables);
queues.add(queueName);
declareExchangeAndBinding(binding, queueName);
declareExchangeAndBinding(binding, queueName, declarables);
}
}
return queues.toArray(new String[queues.size()]);
}

private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue) {
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
Collection<Declarable> declarables) {
String queueName = (String) resolveExpression(bindingQueue.value());
boolean isAnonymous = false;
if (!StringUtils.hasText(queueName)) {
Expand All @@ -649,10 +654,11 @@ private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bin
queue.setAdminsThatShouldDeclare((Object[]) bindingQueue.admins());
}
queue.setShouldDeclare(resolveExpressionAsBoolean(bindingQueue.declare()));
declarables.add(queue);
return queueName;
}

private void declareExchangeAndBinding(QueueBinding binding, String queueName) {
private void declareExchangeAndBinding(QueueBinding binding, String queueName, Collection<Declarable> declarables) {
org.springframework.amqp.rabbit.annotation.Exchange bindingExchange = binding.exchange();
String exchangeName = resolveExpressionAsString(bindingExchange.value(), "@Exchange.exchange");
Assert.isTrue(StringUtils.hasText(exchangeName), () -> "Exchange name required; binding queue " + queueName);
Expand Down Expand Up @@ -697,10 +703,12 @@ private void declareExchangeAndBinding(QueueBinding binding, String queueName) {

((ConfigurableBeanFactory) this.beanFactory)
.registerSingleton(exchangeName + ++this.increment, exchange);
registerBindings(binding, queueName, exchangeName, exchangeType);
registerBindings(binding, queueName, exchangeName, exchangeType, declarables);
declarables.add(exchange);
}

private void registerBindings(QueueBinding binding, String queueName, String exchangeName, String exchangeType) {
private void registerBindings(QueueBinding binding, String queueName, String exchangeName, String exchangeType,
Collection<Declarable> declarables) {
final List<String> routingKeys;
if (exchangeType.equals(ExchangeTypes.FANOUT) || binding.key().length == 0) {
routingKeys = Collections.singletonList("");
Expand All @@ -725,6 +733,7 @@ private void registerBindings(QueueBinding binding, String queueName, String exc
}
((ConfigurableBeanFactory) this.beanFactory)
.registerSingleton(exchangeName + "." + queueName + ++this.increment, actualBinding);
declarables.add(actualBinding);
}
}

Expand Down Expand Up @@ -815,7 +824,7 @@ else if (resolved instanceof String) {
}
}

private String resolveExpressionAsString(String value, String attribute) {
protected String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
Expand Down
Loading