Skip to content

Commit e88962d

Browse files
committed
Topology recovery retry fixes
1 parent e935a34 commit e88962d

File tree

3 files changed

+164
-72
lines changed

3 files changed

+164
-72
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 99 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -759,49 +759,68 @@ public void recoverExchange(RecordedExchange x, boolean retry) {
759759
}
760760
}
761761

762-
762+
/**
763+
* Recover the queue. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
764+
* @param oldName queue name
765+
* @param q recorded queue
766+
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
767+
*/
763768
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
764769
try {
765-
if (topologyRecoveryFilter.filterQueue(q)) {
766-
LOGGER.debug("Recovering {}", q);
767-
if (retry) {
768-
final RecordedQueue entity = q;
769-
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
770-
entity.recover();
771-
return null;
772-
}).getRecordedEntity();
773-
} else {
774-
q.recover();
775-
}
776-
String newName = q.getName();
777-
if (!oldName.equals(newName)) {
778-
// make sure server-named queues are re-added with
779-
// their new names. MK.
780-
synchronized (this.recordedQueues) {
781-
this.propagateQueueNameChangeToBindings(oldName, newName);
782-
this.propagateQueueNameChangeToConsumers(oldName, newName);
783-
// bug26552:
784-
// remove old name after we've updated the bindings and consumers,
785-
// plus only for server-named queues, both to make sure we don't lose
786-
// anything to recover. MK.
787-
if(q.isServerNamed()) {
788-
deleteRecordedQueue(oldName);
789-
}
790-
this.recordedQueues.put(newName, q);
791-
}
792-
}
793-
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
794-
qrl.queueRecovered(oldName, newName);
795-
}
796-
LOGGER.debug("{} has recovered", q);
797-
}
770+
internalRecoverQueue(oldName, q, retry);
798771
} catch (Exception cause) {
799772
final String message = "Caught an exception while recovering queue " + oldName +
800773
": " + cause.getMessage();
801774
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, q);
802775
this.getExceptionHandler().handleTopologyRecoveryException(delegate, q.getDelegateChannel(), e);
803776
}
804777
}
778+
779+
/**
780+
* Recover the queue. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
781+
* @param oldName queue name
782+
* @param q recorded queue
783+
* @throws Exception if an error occurs recovering the queue
784+
*/
785+
void recoverQueue(final String oldName, RecordedQueue q) throws Exception {
786+
internalRecoverQueue(oldName, q, false);
787+
}
788+
789+
private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean retry) throws Exception {
790+
if (topologyRecoveryFilter.filterQueue(q)) {
791+
LOGGER.debug("Recovering {}", q);
792+
if (retry) {
793+
final RecordedQueue entity = q;
794+
q = (RecordedQueue) wrapRetryIfNecessary(q, () -> {
795+
entity.recover();
796+
return null;
797+
}).getRecordedEntity();
798+
} else {
799+
q.recover();
800+
}
801+
String newName = q.getName();
802+
if (!oldName.equals(newName)) {
803+
// make sure server-named queues are re-added with
804+
// their new names. MK.
805+
synchronized (this.recordedQueues) {
806+
this.propagateQueueNameChangeToBindings(oldName, newName);
807+
this.propagateQueueNameChangeToConsumers(oldName, newName);
808+
// bug26552:
809+
// remove old name after we've updated the bindings and consumers,
810+
// plus only for server-named queues, both to make sure we don't lose
811+
// anything to recover. MK.
812+
if(q.isServerNamed()) {
813+
deleteRecordedQueue(oldName);
814+
}
815+
this.recordedQueues.put(newName, q);
816+
}
817+
}
818+
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
819+
qrl.queueRecovered(oldName, newName);
820+
}
821+
LOGGER.debug("{} has recovered", q);
822+
}
823+
}
805824

806825
public void recoverBinding(RecordedBinding b, boolean retry) {
807826
try {
@@ -825,41 +844,61 @@ public void recoverBinding(RecordedBinding b, boolean retry) {
825844
}
826845
}
827846

847+
/**
848+
* Recover the consumer. Any exceptions during recovery will be delivered to the connection's {@link ExceptionHandler}.
849+
* @param tag consumer tag
850+
* @param consumer recorded consumer
851+
* @param retry whether to retry the recovery if an error occurs and a RetryHandler was configured on the connection
852+
*/
828853
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
829854
try {
830-
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
831-
LOGGER.debug("Recovering {}", consumer);
832-
String newTag = null;
833-
if (retry) {
834-
final RecordedConsumer entity = consumer;
835-
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
836-
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
837-
newTag = (String) retryResult.getResult();
838-
} else {
839-
newTag = consumer.recover();
840-
}
841-
842-
// make sure server-generated tags are re-added. MK.
843-
if(tag != null && !tag.equals(newTag)) {
844-
synchronized (this.consumers) {
845-
this.consumers.remove(tag);
846-
this.consumers.put(newTag, consumer);
847-
}
848-
consumer.getChannel().updateConsumerTag(tag, newTag);
849-
}
850-
851-
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
852-
crl.consumerRecovered(tag, newTag);
853-
}
854-
LOGGER.debug("{} has recovered", consumer);
855-
}
855+
internalRecoverConsumer(tag, consumer, retry);
856856
} catch (Exception cause) {
857857
final String message = "Caught an exception while recovering consumer " + tag +
858858
": " + cause.getMessage();
859859
TopologyRecoveryException e = new TopologyRecoveryException(message, cause, consumer);
860860
this.getExceptionHandler().handleTopologyRecoveryException(delegate, consumer.getDelegateChannel(), e);
861861
}
862862
}
863+
864+
/**
865+
* Recover the consumer. Errors are not retried and not delivered to the connection's {@link ExceptionHandler}
866+
* @param tag consumer tag
867+
* @param consumer recorded consumer
868+
* @throws Exception if an error occurs recovering the consumer
869+
*/
870+
void recoverConsumer(final String tag, RecordedConsumer consumer) throws Exception {
871+
internalRecoverConsumer(tag, consumer, false);
872+
}
873+
874+
private void internalRecoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) throws Exception {
875+
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
876+
LOGGER.debug("Recovering {}", consumer);
877+
String newTag = null;
878+
if (retry) {
879+
final RecordedConsumer entity = consumer;
880+
RetryResult retryResult = wrapRetryIfNecessary(consumer, entity::recover);
881+
consumer = (RecordedConsumer) retryResult.getRecordedEntity();
882+
newTag = (String) retryResult.getResult();
883+
} else {
884+
newTag = consumer.recover();
885+
}
886+
887+
// make sure server-generated tags are re-added. MK.
888+
if(tag != null && !tag.equals(newTag)) {
889+
synchronized (this.consumers) {
890+
this.consumers.remove(tag);
891+
this.consumers.put(newTag, consumer);
892+
}
893+
consumer.getChannel().updateConsumerTag(tag, newTag);
894+
}
895+
896+
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
897+
crl.consumerRecovered(tag, newTag);
898+
}
899+
LOGGER.debug("{} has recovered", consumer);
900+
}
901+
}
863902

864903
private <T> RetryResult wrapRetryIfNecessary(RecordedEntity entity, Callable<T> recoveryAction) throws Exception {
865904
if (this.retryHandler == null) {

src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public RecordedQueue exclusive(boolean value) {
3737
this.exclusive = value;
3838
return this;
3939
}
40+
41+
public boolean isExclusive() {
42+
return this.exclusive;
43+
}
4044

4145
public RecordedQueue serverNamed(boolean value) {
4246
this.serverNamed = value;
@@ -47,8 +51,6 @@ public boolean isServerNamed() {
4751
return this.serverNamed;
4852
}
4953

50-
public boolean isAutoDelete() { return this.autoDelete; }
51-
5254
public void recover() throws IOException {
5355
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
5456
this.durable,
@@ -69,17 +71,29 @@ public RecordedQueue durable(boolean value) {
6971
this.durable = value;
7072
return this;
7173
}
74+
75+
public boolean isDurable() {
76+
return this.durable;
77+
}
7278

7379
public RecordedQueue autoDelete(boolean value) {
7480
this.autoDelete = value;
7581
return this;
7682
}
83+
84+
public boolean isAutoDelete() {
85+
return this.autoDelete;
86+
}
7787

7888
public RecordedQueue arguments(Map<String, Object> value) {
7989
this.arguments = value;
8090
return this;
8191
}
8292

93+
public Map<String, Object> getArguments() {
94+
return arguments;
95+
}
96+
8397
@Override
8498
public String toString() {
8599
return "RecordedQueue[name=" + name + ", durable=" + durable + ", autoDelete=" + autoDelete + ", exclusive=" + exclusive + ", arguments=" + arguments + "serverNamed=" + serverNamed + ", channel=" + channel + "]";

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
2020
import com.rabbitmq.utility.Utility;
21+
import java.util.LinkedHashSet;
22+
import java.util.Set;
23+
import java.util.Map.Entry;
2124
import java.util.function.BiPredicate;
2225
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
2326

@@ -62,7 +65,7 @@ public abstract class TopologyRecoveryRetryLogic {
6265
if (context.entity() instanceof RecordedQueue) {
6366
final RecordedQueue recordedQueue = context.queue();
6467
AutorecoveringConnection connection = context.connection();
65-
connection.recoverQueue(recordedQueue.getName(), recordedQueue, false);
68+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
6669
}
6770
return null;
6871
};
@@ -76,9 +79,7 @@ public abstract class TopologyRecoveryRetryLogic {
7679
AutorecoveringConnection connection = context.connection();
7780
RecordedQueue recordedQueue = connection.getRecordedQueues().get(binding.getDestination());
7881
if (recordedQueue != null) {
79-
connection.recoverQueue(
80-
recordedQueue.getName(), recordedQueue, false
81-
);
82+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
8283
}
8384
}
8485
return null;
@@ -122,9 +123,7 @@ public abstract class TopologyRecoveryRetryLogic {
122123
AutorecoveringConnection connection = context.connection();
123124
RecordedQueue recordedQueue = connection.getRecordedQueues().get(consumer.getQueue());
124125
if (recordedQueue != null) {
125-
connection.recoverQueue(
126-
recordedQueue.getName(), recordedQueue, false
127-
);
126+
connection.recoverQueue(recordedQueue.getName(), recordedQueue);
128127
}
129128
}
130129
return null;
@@ -165,14 +164,52 @@ public abstract class TopologyRecoveryRetryLogic {
165164
} else if (consumer.getChannel() == channel) {
166165
final RetryContext retryContext = new RetryContext(consumer, context.exception(), context.connection());
167166
RECOVER_CONSUMER_QUEUE.call(retryContext);
168-
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer, false);
167+
context.connection().recoverConsumer(consumer.getConsumerTag(), consumer);
169168
RECOVER_CONSUMER_QUEUE_BINDINGS.call(retryContext);
170169
}
171170
}
172171
return context.consumer().getConsumerTag();
173172
}
174173
return null;
175174
};
175+
176+
/**
177+
* Recover earlier auto-delete or exclusive queues that share the same channel as this retry context
178+
*/
179+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_PREVIOUS_AUTO_DELETE_QUEUES = context -> {
180+
if (context.entity() instanceof RecordedQueue) {
181+
AutorecoveringConnection connection = context.connection();
182+
RecordedQueue queue = context.queue();
183+
// recover all queues for the same channel that had already been recovered successfully before this queue failed.
184+
// If the previous ones were auto-delete or exclusive, they need recovered again
185+
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
186+
if (entry.getValue() == queue) {
187+
// we have gotten to the queue in this context. Since this is an ordered map we can now break
188+
// as we know we have recovered all the earlier queues on this channel
189+
break;
190+
} else if (queue.getChannel() == entry.getValue().getChannel()
191+
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
192+
connection.recoverQueue(entry.getKey(), entry.getValue());
193+
}
194+
}
195+
} else if (context.entity() instanceof RecordedQueueBinding) {
196+
AutorecoveringConnection connection = context.connection();
197+
Set<String> queues = new LinkedHashSet<>();
198+
for (Entry<String, RecordedQueue> entry : Utility.copy(connection.getRecordedQueues()).entrySet()) {
199+
if (context.entity().getChannel() == entry.getValue().getChannel()
200+
&& (entry.getValue().isAutoDelete() || entry.getValue().isExclusive())) {
201+
connection.recoverQueue(entry.getKey(), entry.getValue());
202+
queues.add(entry.getValue().getName());
203+
}
204+
}
205+
for (final RecordedBinding binding : Utility.copy(connection.getRecordedBindings())) {
206+
if (binding instanceof RecordedQueueBinding && queues.contains(binding.getDestination())) {
207+
binding.recover();
208+
}
209+
}
210+
}
211+
return null;
212+
};
176213

177214
/**
178215
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
@@ -188,11 +225,13 @@ public abstract class TopologyRecoveryRetryLogic {
188225
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
189226
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
190227
.queueRecoveryRetryOperation(RECOVER_CHANNEL
191-
.andThen(RECOVER_QUEUE))
228+
.andThen(RECOVER_QUEUE)
229+
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
192230
.bindingRecoveryRetryOperation(RECOVER_CHANNEL
193231
.andThen(RECOVER_BINDING_QUEUE)
194232
.andThen(RECOVER_BINDING)
195-
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS))
233+
.andThen(RECOVER_PREVIOUS_QUEUE_BINDINGS)
234+
.andThen(RECOVER_PREVIOUS_AUTO_DELETE_QUEUES))
196235
.consumerRecoveryRetryOperation(RECOVER_CHANNEL
197236
.andThen(RECOVER_CONSUMER_QUEUE)
198237
.andThen(RECOVER_CONSUMER)

0 commit comments

Comments
 (0)