-
Notifications
You must be signed in to change notification settings - Fork 582
Fix for QUEUE_NOT_FOUND exceptions during recovery. #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,9 @@ | |
import com.rabbitmq.client.*; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
/** | ||
|
@@ -30,13 +30,13 @@ | |
* @since 3.3.0 | ||
*/ | ||
public class AutorecoveringChannel implements Channel, Recoverable { | ||
private RecoveryAwareChannelN delegate; | ||
private AutorecoveringConnection connection; | ||
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>(); | ||
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>(); | ||
private final List<ReturnListener> returnListeners = new ArrayList<ReturnListener>(); | ||
private final List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>(); | ||
private final List<FlowListener> flowListeners = new ArrayList<FlowListener>(); | ||
private volatile RecoveryAwareChannelN delegate; | ||
private volatile AutorecoveringConnection connection; | ||
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ChannelN was already using CopyOnWriteArrayList for these so followed same pattern here. |
||
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>(); | ||
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>(); | ||
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>(); | ||
private final List<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>(); | ||
private int prefetchCountConsumer; | ||
private int prefetchCountGlobal; | ||
private boolean usesPublisherConfirms; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,16 @@ | |
import com.rabbitmq.client.impl.ConnectionParams; | ||
import com.rabbitmq.client.impl.FrameHandlerFactory; | ||
import com.rabbitmq.client.impl.NetworkConnection; | ||
import com.rabbitmq.utility.Utility; | ||
|
||
import java.io.IOException; | ||
import java.net.InetAddress; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Iterator; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
@@ -60,19 +62,19 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ | |
private final RecoveryAwareAMQConnectionFactory cf; | ||
private final Map<Integer, AutorecoveringChannel> channels; | ||
private final ConnectionParams params; | ||
private RecoveryAwareAMQConnection delegate; | ||
private volatile RecoveryAwareAMQConnection delegate; | ||
|
||
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>(); | ||
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>(); | ||
private final List<BlockedListener> blockedListeners = new ArrayList<BlockedListener>(); | ||
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>()); | ||
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>()); | ||
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>()); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ConcurrentHashMap is perfectly thread-safe, but there were a couple places below where the code was synchronizing on the concurenthashmap to prevent access to it. That won't work unless the entire map itself is synchronized so switching to synchronized hash maps here. A copy is created of the map anytime it is iterated over now to make iteration thread safe. |
||
// Records topology changes | ||
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>(); | ||
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>(); | ||
private final Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>(); | ||
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>(); | ||
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>(); | ||
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>(); | ||
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>()); | ||
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>()); | ||
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>()); | ||
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>()); | ||
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>()); | ||
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>()); | ||
|
||
// Used to block connection recovery attempts after close() is invoked. | ||
private volatile boolean manuallyClosed = false; | ||
|
@@ -136,10 +138,10 @@ public Channel createChannel(int channelNumber) throws IOException { | |
* @return Recovering channel. | ||
*/ | ||
private Channel wrapChannel(RecoveryAwareChannelN delegateChannel) { | ||
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel); | ||
if (delegateChannel == null) { | ||
return null; | ||
} else { | ||
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel); | ||
this.registerChannel(channel); | ||
return channel; | ||
} | ||
|
@@ -514,13 +516,13 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException { | |
} | ||
|
||
private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) { | ||
for (ShutdownListener sh : this.shutdownHooks) { | ||
for (ShutdownListener sh : Utility.copy(this.shutdownHooks)) { | ||
newConn.addShutdownListener(sh); | ||
} | ||
} | ||
|
||
private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) { | ||
for (BlockedListener bl : this.blockedListeners) { | ||
for (BlockedListener bl : Utility.copy(this.blockedListeners)) { | ||
newConn.addBlockedListener(bl); | ||
} | ||
} | ||
|
@@ -564,7 +566,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) { | |
} | ||
|
||
private void notifyRecoveryListeners() { | ||
for (RecoveryListener f : this.recoveryListeners) { | ||
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) { | ||
f.handleRecovery(this); | ||
} | ||
} | ||
|
@@ -585,7 +587,7 @@ private void recoverExchanges() { | |
// recorded exchanges are guaranteed to be | ||
// non-predefined (we filter out predefined ones | ||
// in exchangeDeclare). MK. | ||
for (RecordedExchange x : this.recordedExchanges.values()) { | ||
for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) { | ||
try { | ||
x.recover(); | ||
} catch (Exception cause) { | ||
|
@@ -598,8 +600,7 @@ private void recoverExchanges() { | |
} | ||
|
||
private void recoverQueues() { | ||
Map<String, RecordedQueue> copy = new HashMap<String, RecordedQueue>(this.recordedQueues); | ||
for (Map.Entry<String, RecordedQueue> entry : copy.entrySet()) { | ||
for (Map.Entry<String, RecordedQueue> entry : Utility.copy(this.recordedQueues).entrySet()) { | ||
String oldName = entry.getKey(); | ||
RecordedQueue q = entry.getValue(); | ||
try { | ||
|
@@ -621,7 +622,7 @@ private void recoverQueues() { | |
this.recordedQueues.put(newName, q); | ||
} | ||
} | ||
for(QueueRecoveryListener qrl : this.queueRecoveryListeners) { | ||
for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { | ||
qrl.queueRecovered(oldName, newName); | ||
} | ||
} catch (Exception cause) { | ||
|
@@ -634,7 +635,7 @@ private void recoverQueues() { | |
} | ||
|
||
private void recoverBindings() { | ||
for (RecordedBinding b : this.recordedBindings) { | ||
for (RecordedBinding b : Utility.copy(this.recordedBindings)) { | ||
try { | ||
b.recover(); | ||
} catch (Exception cause) { | ||
|
@@ -647,8 +648,7 @@ private void recoverBindings() { | |
} | ||
|
||
private void recoverConsumers() { | ||
Map<String, RecordedConsumer> copy = new HashMap<String, RecordedConsumer>(this.consumers); | ||
for (Map.Entry<String, RecordedConsumer> entry : copy.entrySet()) { | ||
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) { | ||
String tag = entry.getKey(); | ||
RecordedConsumer consumer = entry.getValue(); | ||
|
||
|
@@ -659,7 +659,7 @@ private void recoverConsumers() { | |
this.consumers.remove(tag); | ||
this.consumers.put(newTag, consumer); | ||
} | ||
for(ConsumerRecoveryListener crl : this.consumerRecoveryListeners) { | ||
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { | ||
crl.consumerRecovered(tag, newTag); | ||
} | ||
} catch (Exception cause) { | ||
|
@@ -672,22 +672,22 @@ private void recoverConsumers() { | |
} | ||
|
||
private void propagateQueueNameChangeToBindings(String oldName, String newName) { | ||
for (RecordedBinding b : this.recordedBindings) { | ||
for (RecordedBinding b : Utility.copy(this.recordedBindings)) { | ||
if (b.getDestination().equals(oldName)) { | ||
b.setDestination(newName); | ||
} | ||
} | ||
} | ||
|
||
private void propagateQueueNameChangeToConsumers(String oldName, String newName) { | ||
for (RecordedConsumer c : this.consumers.values()) { | ||
for (RecordedConsumer c : Utility.copy(this.consumers).values()) { | ||
if (c.getQueue().equals(oldName)) { | ||
c.setQueue(newName); | ||
} | ||
} | ||
} | ||
|
||
synchronized void recordQueueBinding(AutorecoveringChannel ch, | ||
void recordQueueBinding(AutorecoveringChannel ch, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. synchronized was on these next 4 methods in what appears to be an effort to protect recordedBindings. This is now handled by wrapping it in Collections.synchronized so removing the class level synchronizations here. |
||
String queue, | ||
String exchange, | ||
String routingKey, | ||
|
@@ -701,7 +701,7 @@ synchronized void recordQueueBinding(AutorecoveringChannel ch, | |
this.recordedBindings.add(binding); | ||
} | ||
|
||
synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch, | ||
boolean deleteRecordedQueueBinding(AutorecoveringChannel ch, | ||
String queue, | ||
String exchange, | ||
String routingKey, | ||
|
@@ -714,7 +714,7 @@ synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch, | |
return this.recordedBindings.remove(b); | ||
} | ||
|
||
synchronized void recordExchangeBinding(AutorecoveringChannel ch, | ||
void recordExchangeBinding(AutorecoveringChannel ch, | ||
String destination, | ||
String source, | ||
String routingKey, | ||
|
@@ -728,7 +728,7 @@ synchronized void recordExchangeBinding(AutorecoveringChannel ch, | |
this.recordedBindings.add(binding); | ||
} | ||
|
||
synchronized boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch, | ||
boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch, | ||
String destination, | ||
String source, | ||
String routingKey, | ||
|
@@ -784,7 +784,9 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) { | |
RecordedQueue q = this.recordedQueues.get(queue); | ||
// last consumer on this connection is gone, remove recorded queue | ||
// if it is auto-deleted. See bug 26364. | ||
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); } | ||
if((q != null) && q.isAutoDelete()) { | ||
deleteRecordedQueue(queue); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is the fix. This method already handles the logic of removing the recorded queue and it's bindings. |
||
} | ||
} | ||
} | ||
} | ||
|
@@ -793,11 +795,13 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) { | |
void maybeDeleteRecordedAutoDeleteExchange(String exchange) { | ||
synchronized (this.recordedExchanges) { | ||
synchronized (this.consumers) { | ||
if(!hasMoreDestinationsBoundToExchange(this.recordedBindings, exchange)) { | ||
if(!hasMoreDestinationsBoundToExchange(Utility.copy(this.recordedBindings), exchange)) { | ||
RecordedExchange x = this.recordedExchanges.get(exchange); | ||
// last binding where this exchange is the source is gone, remove recorded exchange | ||
// if it is auto-deleted. See bug 26364. | ||
if((x != null) && x.isAutoDelete()) { this.recordedExchanges.remove(exchange); } | ||
if((x != null) && x.isAutoDelete()) { | ||
this.recordedExchanges.remove(exchange); | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -825,13 +829,15 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q | |
return result; | ||
} | ||
|
||
synchronized Set<RecordedBinding> removeBindingsWithDestination(String s) { | ||
Set<RecordedBinding> result = new HashSet<RecordedBinding>(); | ||
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) { | ||
RecordedBinding b = it.next(); | ||
if(b.getDestination().equals(s)) { | ||
it.remove(); | ||
result.add(b); | ||
Set<RecordedBinding> removeBindingsWithDestination(String s) { | ||
final Set<RecordedBinding> result = new HashSet<RecordedBinding>(); | ||
synchronized (this.recordedBindings) { | ||
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) { | ||
RecordedBinding b = it.next(); | ||
if(b.getDestination().equals(s)) { | ||
it.remove(); | ||
result.add(b); | ||
} | ||
} | ||
} | ||
return result; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,7 +57,7 @@ public RecordedConsumer autoAck(boolean value) { | |
} | ||
|
||
public String recover() throws IOException { | ||
this.consumerTag = this.channel.basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer); | ||
this.consumerTag = this.channel.getDelegate().basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all the recorded entities besides these 2 use getDelegate(). I think these 2 should as well otherwise it ends up duplicating the RecorcedConsumer/Queue and readding it to the recorded entity collections. |
||
return this.consumerTag; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the super mark/reset methods are synchronized, this is addressing a findbug warning