Skip to content

Commit c8663b3

Browse files
authored
GH-1365: Recover Manual Declarations
Resolves #1365 * Verify bean definitions are not added to `manualDeclarables`.
1 parent c9f1b4c commit c8663b3

File tree

4 files changed

+198
-4
lines changed

4 files changed

+198
-4
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
import java.io.IOException;
2020
import java.util.ArrayList;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.HashMap;
24+
import java.util.Iterator;
25+
import java.util.LinkedHashMap;
2326
import java.util.LinkedList;
2427
import java.util.List;
2528
import java.util.Map;
29+
import java.util.Map.Entry;
2630
import java.util.Properties;
2731
import java.util.concurrent.TimeoutException;
2832
import java.util.concurrent.atomic.AtomicBoolean;
@@ -63,6 +67,7 @@
6367
import org.springframework.util.StringUtils;
6468

6569
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
70+
import com.rabbitmq.client.AMQP.Queue.DeleteOk;
6671
import com.rabbitmq.client.AMQP.Queue.PurgeOk;
6772
import com.rabbitmq.client.Channel;
6873

@@ -124,6 +129,8 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat
124129

125130
private final ConnectionFactory connectionFactory;
126131

132+
private final Map<String, Declarable> manualDeclarables = Collections.synchronizedMap(new LinkedHashMap<>());
133+
127134
private String beanName;
128135

129136
private RetryTemplate retryTemplate;
@@ -142,6 +149,8 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat
142149

143150
private boolean explicitDeclarationsOnly;
144151

152+
private boolean redeclareManualDeclarations;
153+
145154
private volatile boolean running = false;
146155

147156
private volatile DeclarationExceptionEvent lastDeclarationExceptionEvent;
@@ -220,6 +229,9 @@ public void declareExchange(final Exchange exchange) {
220229
try {
221230
this.rabbitTemplate.execute(channel -> {
222231
declareExchanges(channel, exchange);
232+
if (this.redeclareManualDeclarations) {
233+
this.manualDeclarables.put(exchange.getName(), exchange);
234+
}
223235
return null;
224236
});
225237
}
@@ -238,6 +250,7 @@ public boolean deleteExchange(final String exchangeName) {
238250

239251
try {
240252
channel.exchangeDelete(exchangeName);
253+
removeExchangeBindings(exchangeName);
241254
}
242255
catch (@SuppressWarnings(UNUSED) IOException e) {
243256
return false;
@@ -246,6 +259,24 @@ public boolean deleteExchange(final String exchangeName) {
246259
});
247260
}
248261

262+
private void removeExchangeBindings(final String exchangeName) {
263+
this.manualDeclarables.remove(exchangeName);
264+
synchronized (this.manualDeclarables) {
265+
Iterator<Entry<String, Declarable>> iterator = this.manualDeclarables.entrySet().iterator();
266+
while (iterator.hasNext()) {
267+
Entry<String, Declarable> next = iterator.next();
268+
if (next.getValue() instanceof Binding) {
269+
Binding binding = (Binding) next.getValue();
270+
if ((!binding.isDestinationQueue() && binding.getDestination().equals(exchangeName))
271+
|| binding.getExchange().equals(exchangeName)) {
272+
iterator.remove();
273+
}
274+
}
275+
}
276+
}
277+
}
278+
279+
249280
// Queue operations
250281

251282
/**
@@ -266,7 +297,11 @@ public String declareQueue(final Queue queue) {
266297
try {
267298
return this.rabbitTemplate.execute(channel -> {
268299
DeclareOk[] declared = declareQueues(channel, queue);
269-
return declared.length > 0 ? declared[0].getQueue() : null;
300+
String result = declared.length > 0 ? declared[0].getQueue() : null;
301+
if (this.redeclareManualDeclarations) {
302+
this.manualDeclarables.put(result, queue);
303+
}
304+
return result;
270305
});
271306
}
272307
catch (AmqpException e) {
@@ -303,6 +338,7 @@ public boolean deleteQueue(final String queueName) {
303338
return this.rabbitTemplate.execute(channel -> { // NOSONAR never returns null
304339
try {
305340
channel.queueDelete(queueName);
341+
removeQueueBindings(queueName);
306342
}
307343
catch (@SuppressWarnings(UNUSED) IOException e) {
308344
return false;
@@ -316,11 +352,28 @@ public boolean deleteQueue(final String queueName) {
316352
"Delete a queue from the broker if unused and empty (when corresponding arguments are true")
317353
public void deleteQueue(final String queueName, final boolean unused, final boolean empty) {
318354
this.rabbitTemplate.execute(channel -> {
319-
channel.queueDelete(queueName, unused, empty);
355+
DeleteOk queueDelete = channel.queueDelete(queueName, unused, empty);
356+
removeQueueBindings(queueName);
320357
return null;
321358
});
322359
}
323360

361+
private void removeQueueBindings(final String queueName) {
362+
this.manualDeclarables.remove(queueName);
363+
synchronized (this.manualDeclarables) {
364+
Iterator<Entry<String, Declarable>> iterator = this.manualDeclarables.entrySet().iterator();
365+
while (iterator.hasNext()) {
366+
Entry<String, Declarable> next = iterator.next();
367+
if (next.getValue() instanceof Binding) {
368+
Binding binding = (Binding) next.getValue();
369+
if (binding.isDestinationQueue() && binding.getDestination().equals(queueName)) {
370+
iterator.remove();
371+
}
372+
}
373+
}
374+
}
375+
}
376+
324377
@Override
325378
@ManagedOperation(description = "Purge a queue and optionally don't wait for the purge to occur")
326379
public void purgeQueue(final String queueName, final boolean noWait) {
@@ -352,6 +405,9 @@ public void declareBinding(final Binding binding) {
352405
try {
353406
this.rabbitTemplate.execute(channel -> {
354407
declareBindings(channel, binding);
408+
if (this.redeclareManualDeclarations) {
409+
this.manualDeclarables.put(binding.toString(), binding);
410+
}
355411
return null;
356412
});
357413
}
@@ -377,6 +433,7 @@ public void removeBinding(final Binding binding) {
377433
channel.exchangeUnbind(binding.getDestination(), binding.getExchange(), binding.getRoutingKey(),
378434
binding.getArguments());
379435
}
436+
this.manualDeclarables.remove(binding.toString());
380437
return null;
381438
});
382439
}
@@ -444,6 +501,37 @@ public void setExplicitDeclarationsOnly(boolean explicitDeclarationsOnly) {
444501
this.explicitDeclarationsOnly = explicitDeclarationsOnly;
445502
}
446503

504+
/**
505+
* Normally, when a connection is recovered, the admin only recovers auto-delete queues,
506+
* etc, that are declared as beans in the application context. When this is true, it
507+
* will also redeclare any manually declared {@link Declarable}s via admin methods.
508+
* @return true to redeclare.
509+
* @since 2.4
510+
*/
511+
public boolean isRedeclareManualDeclarations() {
512+
return this.redeclareManualDeclarations;
513+
}
514+
515+
/**
516+
* Normally, when a connection is recovered, the admin only recovers auto-delete
517+
* queues, etc, that are declared as beans in the application context. When this is
518+
* true, it will also redeclare any manually declared {@link Declarable}s via admin
519+
* methods. When a queue or exhange is deleted, it will not longer be recovered, nor
520+
* will any corresponding bindings.
521+
* @param redeclareManualDeclarations true to redeclare.
522+
* @since 2.4
523+
* @see #declareQueue(Queue)
524+
* @see #declareExchange(Exchange)
525+
* @see #declareBinding(Binding)
526+
* @see #deleteQueue(String)
527+
* @see #deleteExchange(String)
528+
* @see #removeBinding(Binding)
529+
* @see #resetAllManualDeclarations()
530+
*/
531+
public void setRedeclareManualDeclarations(boolean redeclareManualDeclarations) {
532+
this.redeclareManualDeclarations = redeclareManualDeclarations;
533+
}
534+
447535
/**
448536
* Set a retry template for auto declarations. There is a race condition with
449537
* auto-delete, exclusive queues in that the queue might still exist for a short time,
@@ -597,7 +685,7 @@ public void initialize() {
597685
}
598686
}
599687

600-
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
688+
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0 && this.manualDeclarables.size() == 0) {
601689
this.logger.debug("Nothing to declare");
602690
return;
603691
}
@@ -607,10 +695,36 @@ public void initialize() {
607695
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
608696
return null;
609697
});
698+
if (this.manualDeclarables.size() > 0) {
699+
synchronized (this.manualDeclarables) {
700+
this.logger.debug("Redeclaring manually declared Declarables");
701+
for (Declarable dec : this.manualDeclarables.values()) {
702+
if (dec instanceof Queue) {
703+
declareQueue((Queue) dec);
704+
}
705+
else if (dec instanceof Exchange) {
706+
declareExchange((Exchange) dec);
707+
}
708+
else {
709+
declareBinding((Binding) dec);
710+
}
711+
}
712+
}
713+
}
610714
this.logger.debug("Declarations finished");
611715

612716
}
613717

718+
/**
719+
* Invoke this method to prevent the admin from recovering any declarations made
720+
* by calls to {@code declare*()} methods.
721+
* @since 2.4
722+
* @see #setRedeclareManualDeclarations(boolean)
723+
*/
724+
public void resetAllManualDeclarations() {
725+
this.manualDeclarables.clear();
726+
}
727+
614728
private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
615729
Collection<Binding> contextBindings) {
616730

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,67 @@ public void testLeaderLocator() throws Exception {
387387
cf.destroy();
388388
}
389389

390+
@Test
391+
void manualDeclarations() {
392+
CachingConnectionFactory cf = new CachingConnectionFactory(
393+
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
394+
RabbitAdmin admin = new RabbitAdmin(cf);
395+
GenericApplicationContext applicationContext = new GenericApplicationContext();
396+
admin.setApplicationContext(applicationContext);
397+
admin.setRedeclareManualDeclarations(true);
398+
applicationContext.registerBean("admin", RabbitAdmin.class, () -> admin);
399+
applicationContext.registerBean("beanQueue", Queue.class,
400+
() -> new Queue("thisOneShouldntBeInTheManualDecs", false, true, true));
401+
applicationContext.registerBean("beanEx", DirectExchange.class,
402+
() -> new DirectExchange("thisOneShouldntBeInTheManualDecs", false, true));
403+
applicationContext.registerBean("beanBinding", Binding.class,
404+
() -> new Binding("thisOneShouldntBeInTheManualDecs", DestinationType.QUEUE,
405+
"thisOneShouldntBeInTheManualDecs", "test", null));
406+
applicationContext.refresh();
407+
Map<?, ?> declarables = TestUtils.getPropertyValue(admin, "manualDeclarables", Map.class);
408+
assertThat(declarables).hasSize(0);
409+
// check the auto-configured Declarables
410+
RabbitTemplate template = new RabbitTemplate(cf);
411+
template.convertAndSend("thisOneShouldntBeInTheManualDecs", "test", "foo");
412+
Object received = template.receiveAndConvert("thisOneShouldntBeInTheManualDecs", 5000);
413+
assertThat(received).isEqualTo("foo");
414+
// manual declarations
415+
admin.declareQueue(new Queue("test1", false, true, true));
416+
admin.declareQueue(new Queue("test2", false, true, true));
417+
admin.declareExchange(new DirectExchange("ex1", false, true));
418+
admin.declareBinding(new Binding("test1", DestinationType.QUEUE, "ex1", "test", null));
419+
admin.deleteQueue("test2");
420+
template.execute(chan -> chan.queueDelete("test1"));
421+
cf.resetConnection();
422+
admin.initialize();
423+
assertThat(admin.getQueueProperties("test1")).isNotNull();
424+
assertThat(admin.getQueueProperties("test2")).isNull();
425+
assertThat(declarables).hasSize(3);
426+
// verify the exchange and binding were recovered too
427+
template.convertAndSend("ex1", "test", "foo");
428+
received = template.receiveAndConvert("test1", 5000);
429+
assertThat(received).isEqualTo("foo");
430+
admin.resetAllManualDeclarations();
431+
assertThat(declarables).hasSize(0);
432+
cf.resetConnection();
433+
admin.initialize();
434+
assertThat(admin.getQueueProperties("test1")).isNull();
435+
admin.declareQueue(new Queue("test1", false, true, true));
436+
admin.declareExchange(new DirectExchange("ex1", false, true));
437+
admin.declareBinding(new Binding("test1", DestinationType.QUEUE, "ex1", "test", null));
438+
admin.declareExchange(new DirectExchange("ex2", false, true));
439+
admin.declareBinding(new Binding("test1", DestinationType.QUEUE, "ex2", "test", null));
440+
admin.declareBinding(new Binding("ex1", DestinationType.EXCHANGE, "ex2", "ex1", null));
441+
assertThat(declarables).hasSize(6);
442+
admin.deleteExchange("ex2");
443+
assertThat(declarables).hasSize(3);
444+
admin.deleteQueue("test1");
445+
assertThat(declarables).hasSize(1);
446+
admin.deleteExchange("ex1");
447+
assertThat(declarables).hasSize(0);
448+
cf.destroy();
449+
}
450+
390451
@Configuration
391452
public static class Config {
392453

src/reference/asciidoc/amqp.adoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5371,6 +5371,20 @@ Starting with version 2.1, anonymous queues are declared with argument `Queue.X_
53715371
This ensures that the queue is declared on the node to which the application is connected.
53725372
You can revert to the previous behavior by calling `queue.setLeaderLocator(null)` after constructing the instance.
53735373

5374+
[[declarable-recovery]]
5375+
===== Recovering Auto-Delete Declarations
5376+
5377+
Normally, the `RabbitAdmin` (s) only recover queues/exchanges/bindings that are declared as beans in the application context; if any such declarations are auto-delete, they will be removed by the broker if the connection is lost.
5378+
When the connection is re-established, the admin will redeclare the entities.
5379+
Normally, entities created by calling `admin.declareQueue(...)`, `admin.declareExchange(...)` and `admin.declareBinding(...)` will not be recovered.
5380+
5381+
Starting with version 2.4, the admin has a new property `redeclareManualDeclarations`; when true, the admin will recover these entities in addition to the beans in the application context.
5382+
5383+
Recovery of individual declarations will not be performed if `deleteQueue(...)`, `deleteExchange(...)` or `removeBinding(...)` is called.
5384+
Associated bindings are removed from the recoverable entities when queues and exchanges are deleted.
5385+
5386+
Finally, calling `resetAllManualDeclarations()` will prevent the recovery of any previously declared entities.
5387+
53745388
[[broker-events]]
53755389
==== Broker Event Listener
53765390

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
This section describes the changes between version 2.4 and version 2.4.
77
See <<change-history>> for changes in previous versions.
88

9-
==== @RabbitListener Changes
9+
==== `@RabbitListener` Changes
1010

1111
`MessageProperties` is now available for argument matching.
1212
See <<async-annotation-driven-enable-signature>> for more information.
13+
14+
==== `RabbitAdmin` Changes
15+
16+
A new property `recoverManualDeclarations` allows recovery of manually declared queues/exchanges/bindings.
17+
See <<declarable-recovery>> for more information.

0 commit comments

Comments
 (0)