Skip to content

Commit 82a7228

Browse files
committed
GH-2498: Fix Manual Redeclaration With Dup. Names
Resolves #2501 Manual redeclaration logic did not account for an exchange having the same name as a queue. **back port to 2.4.x will have conflicts and requires instanceof polishing** (I will do it after merge).
1 parent c00f588 commit 82a7228

File tree

4 files changed

+67
-32
lines changed

4 files changed

+67
-32
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.Map;
2121
import java.util.Properties;
22+
import java.util.Set;
2223

2324
import org.springframework.lang.Nullable;
2425

@@ -133,11 +134,22 @@ public interface AmqpAdmin {
133134
* Return the manually declared AMQP objects.
134135
* @return the manually declared AMQP objects.
135136
* @since 2.4.13
137+
* @deprecated in favor of {@link #getManualDeclarableSet()}.
136138
*/
139+
@Deprecated
137140
default Map<String, Declarable> getManualDeclarables() {
138141
return Collections.emptyMap();
139142
}
140143

144+
/**
145+
* Return the manually declared AMQP objects.
146+
* @return the manually declared AMQP objects.
147+
* @since 2.4.15
148+
*/
149+
default Set<Declarable> getManualDeclarableSet() {
150+
return Collections.emptySet();
151+
}
152+
141153
/**
142154
* Initialize the admin.
143155
* @since 2.1

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import java.util.Collections;
2323
import java.util.HashMap;
2424
import java.util.Iterator;
25-
import java.util.LinkedHashMap;
25+
import java.util.LinkedHashSet;
2626
import java.util.LinkedList;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Map.Entry;
3029
import java.util.Properties;
30+
import java.util.Set;
3131
import java.util.concurrent.TimeoutException;
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333
import java.util.concurrent.atomic.AtomicReference;
@@ -128,7 +128,7 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat
128128

129129
private final ConnectionFactory connectionFactory;
130130

131-
private final Map<String, Declarable> manualDeclarables = Collections.synchronizedMap(new LinkedHashMap<>());
131+
private final Set<Declarable> manualDeclarables = Collections.synchronizedSet(new LinkedHashSet<>());
132132

133133
private String beanName;
134134

@@ -229,7 +229,7 @@ public void declareExchange(final Exchange exchange) {
229229
this.rabbitTemplate.execute(channel -> {
230230
declareExchanges(channel, exchange);
231231
if (this.redeclareManualDeclarations) {
232-
this.manualDeclarables.put(exchange.getName(), exchange);
232+
this.manualDeclarables.add(exchange);
233233
}
234234
return null;
235235
});
@@ -259,13 +259,16 @@ public boolean deleteExchange(final String exchangeName) {
259259
}
260260

261261
private void removeExchangeBindings(final String exchangeName) {
262-
this.manualDeclarables.remove(exchangeName);
263262
synchronized (this.manualDeclarables) {
264-
Iterator<Entry<String, Declarable>> iterator = this.manualDeclarables.entrySet().iterator();
263+
this.manualDeclarables.stream()
264+
.filter(dec -> dec instanceof Exchange && ((Exchange) dec).getName().equals(exchangeName))
265+
.collect(Collectors.toSet())
266+
.forEach(ex -> this.manualDeclarables.remove(ex));
267+
Iterator<Declarable> iterator = this.manualDeclarables.iterator();
265268
while (iterator.hasNext()) {
266-
Entry<String, Declarable> next = iterator.next();
267-
if (next.getValue() instanceof Binding) {
268-
Binding binding = (Binding) next.getValue();
269+
Declarable next = iterator.next();
270+
if (next instanceof Binding) {
271+
Binding binding = (Binding) next;
269272
if ((!binding.isDestinationQueue() && binding.getDestination().equals(exchangeName))
270273
|| binding.getExchange().equals(exchangeName)) {
271274
iterator.remove();
@@ -298,7 +301,7 @@ public String declareQueue(final Queue queue) {
298301
DeclareOk[] declared = declareQueues(channel, queue);
299302
String result = declared.length > 0 ? declared[0].getQueue() : null;
300303
if (this.redeclareManualDeclarations) {
301-
this.manualDeclarables.put(result, queue);
304+
this.manualDeclarables.add(queue);
302305
}
303306
return result;
304307
});
@@ -358,13 +361,16 @@ public void deleteQueue(final String queueName, final boolean unused, final bool
358361
}
359362

360363
private void removeQueueBindings(final String queueName) {
361-
this.manualDeclarables.remove(queueName);
362364
synchronized (this.manualDeclarables) {
363-
Iterator<Entry<String, Declarable>> iterator = this.manualDeclarables.entrySet().iterator();
365+
this.manualDeclarables.stream()
366+
.filter(dec -> dec instanceof Queue && ((Queue) dec).getName().equals(queueName))
367+
.collect(Collectors.toSet())
368+
.forEach(q -> this.manualDeclarables.remove(q));
369+
Iterator<Declarable> iterator = this.manualDeclarables.iterator();
364370
while (iterator.hasNext()) {
365-
Entry<String, Declarable> next = iterator.next();
366-
if (next.getValue() instanceof Binding) {
367-
Binding binding = (Binding) next.getValue();
371+
Declarable next = iterator.next();
372+
if (next instanceof Binding) {
373+
Binding binding = (Binding) next;
368374
if (binding.isDestinationQueue() && binding.getDestination().equals(queueName)) {
369375
iterator.remove();
370376
}
@@ -405,7 +411,7 @@ public void declareBinding(final Binding binding) {
405411
this.rabbitTemplate.execute(channel -> {
406412
declareBindings(channel, binding);
407413
if (this.redeclareManualDeclarations) {
408-
this.manualDeclarables.put(binding.toString(), binding);
414+
this.manualDeclarables.add(binding);
409415
}
410416
return null;
411417
});
@@ -707,7 +713,7 @@ private void redeclareManualDeclarables() {
707713
if (this.manualDeclarables.size() > 0) {
708714
synchronized (this.manualDeclarables) {
709715
this.logger.debug("Redeclaring manually declared Declarables");
710-
for (Declarable dec : this.manualDeclarables.values()) {
716+
for (Declarable dec : this.manualDeclarables) {
711717
if (dec instanceof Queue) {
712718
declareQueue((Queue) dec);
713719
}
@@ -733,14 +739,27 @@ public void resetAllManualDeclarations() {
733739
this.manualDeclarables.clear();
734740
}
735741

736-
/**
737-
* Return the manually declared AMQP objects.
738-
* @return the manually declared AMQP objects.
739-
* @since 2.4.13
740-
*/
741742
@Override
743+
@Deprecated
742744
public Map<String, Declarable> getManualDeclarables() {
743-
return Collections.unmodifiableMap(this.manualDeclarables);
745+
Map<String, Declarable> declarables = new HashMap<>();
746+
this.manualDeclarables.forEach(declarable -> {
747+
if (declarable instanceof Exchange) {
748+
declarables.put(((Exchange) declarable).getName(), declarable);
749+
}
750+
else if (declarable instanceof Queue) {
751+
declarables.put(((Queue) declarable).getName(), declarable);
752+
}
753+
else if (declarable instanceof Binding) {
754+
declarables.put(declarable.toString(), declarable);
755+
}
756+
});
757+
return declarables;
758+
}
759+
760+
@Override
761+
public Set<Declarable> getManualDeclarableSet() {
762+
return Collections.unmodifiableSet(this.manualDeclarables);
744763
}
745764

746765
private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1982,8 +1982,7 @@ private void attemptDeclarations(AmqpAdmin admin) {
19821982
context.getBeansOfType(Queue.class, false, false).values());
19831983
Map<String, Declarables> declarables = context.getBeansOfType(Declarables.class, false, false);
19841984
declarables.values().forEach(dec -> queues.addAll(dec.getDeclarablesByType(Queue.class)));
1985-
admin.getManualDeclarables()
1986-
.values()
1985+
admin.getManualDeclarableSet()
19871986
.stream()
19881987
.filter(Queue.class::isInstance)
19891988
.map(Queue.class::cast)

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
4141
import java.util.List;
4242
import java.util.Map;
4343
import java.util.Properties;
44+
import java.util.Set;
4445
import java.util.UUID;
4546
import java.util.concurrent.ExecutorService;
4647
import java.util.concurrent.TimeoutException;
@@ -401,27 +402,31 @@ void manualDeclarations() {
401402
() -> new Binding("thisOneShouldntBeInTheManualDecs", DestinationType.QUEUE,
402403
"thisOneShouldntBeInTheManualDecs", "test", null));
403404
applicationContext.refresh();
404-
Map<?, ?> declarables = TestUtils.getPropertyValue(admin, "manualDeclarables", Map.class);
405+
Set<?> declarables = TestUtils.getPropertyValue(admin, "manualDeclarables", Set.class);
405406
assertThat(declarables).hasSize(0);
406407
// check the auto-configured Declarables
407408
RabbitTemplate template = new RabbitTemplate(cf);
408409
template.convertAndSend("thisOneShouldntBeInTheManualDecs", "test", "foo");
409410
Object received = template.receiveAndConvert("thisOneShouldntBeInTheManualDecs", 5000);
410411
assertThat(received).isEqualTo("foo");
411412
// manual declarations
413+
admin.declareExchange(new DirectExchange("test1", false, true));
412414
admin.declareQueue(new Queue("test1", false, true, true));
413415
admin.declareQueue(new Queue("test2", false, true, true));
414-
admin.declareExchange(new DirectExchange("ex1", false, true));
415-
admin.declareBinding(new Binding("test1", DestinationType.QUEUE, "ex1", "test", null));
416+
admin.declareBinding(new Binding("test1", DestinationType.QUEUE, "test1", "test", null));
416417
admin.deleteQueue("test2");
417-
template.execute(chan -> chan.queueDelete("test1"));
418+
template.execute(chan -> {
419+
chan.queueDelete("test1");
420+
chan.exchangeDelete("test1");
421+
return null;
422+
});
418423
cf.resetConnection();
419424
admin.initialize();
420425
assertThat(admin.getQueueProperties("test1")).isNotNull();
421426
assertThat(admin.getQueueProperties("test2")).isNull();
422427
assertThat(declarables).hasSize(3);
423428
// verify the exchange and binding were recovered too
424-
template.convertAndSend("ex1", "test", "foo");
429+
template.convertAndSend("test1", "test", "foo");
425430
received = template.receiveAndConvert("test1", 5000);
426431
assertThat(received).isEqualTo("foo");
427432
admin.resetAllManualDeclarations();
@@ -451,7 +456,7 @@ void manualDeclarationsWithoutApplicationContext() {
451456
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
452457
RabbitAdmin admin = new RabbitAdmin(cf);
453458
admin.setRedeclareManualDeclarations(true);
454-
Map<?, ?> declarables = TestUtils.getPropertyValue(admin, "manualDeclarables", Map.class);
459+
Set<?> declarables = TestUtils.getPropertyValue(admin, "manualDeclarables", Set.class);
455460
assertThat(declarables).hasSize(0);
456461
RabbitTemplate template = new RabbitTemplate(cf);
457462
// manual declarations

0 commit comments

Comments
 (0)