-
Notifications
You must be signed in to change notification settings - Fork 582
Fix for QUEUE_NOT_FOUND exceptions during recovery. #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -42,7 +42,7 @@ public int available() throws IOException { | |||
} | |||
|
|||
@Override | |||
public void mark(int readlimit) { | |||
public synchronized void mark(int readlimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the super mark/reset methods are synchronized, this is addressing a findbug warning
private final List<FlowListener> flowListeners = new ArrayList<FlowListener>(); | ||
private volatile RecoveryAwareChannelN delegate; | ||
private volatile AutorecoveringConnection connection; | ||
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChannelN was already using CopyOnWriteArrayList for these so followed same pattern here.
|
||
// Records topology changes | ||
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentHashMap is perfectly thread-safe, but there were a couple places below where the code was synchronizing on the concurenthashmap to prevent access to it. That won't work unless the entire map itself is synchronized so switching to synchronized hash maps here. A copy is created of the map anytime it is iterated over now to make iteration thread safe.
if (c.getQueue().equals(oldName)) { | ||
c.setQueue(newName); | ||
} | ||
} | ||
} | ||
|
||
synchronized void recordQueueBinding(AutorecoveringChannel ch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized was on these next 4 methods in what appears to be an effort to protect recordedBindings. This is now handled by wrapping it in Collections.synchronized so removing the class level synchronizations here.
@@ -784,7 +784,9 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) { | |||
RecordedQueue q = this.recordedQueues.get(queue); | |||
// last consumer on this connection is gone, remove recorded queue | |||
// if it is auto-deleted. See bug 26364. | |||
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); } | |||
if((q != null) && q.isAutoDelete()) { | |||
deleteRecordedQueue(queue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the fix. This method already handles the logic of removing the recorded queue and it's bindings.
@@ -57,7 +57,7 @@ public RecordedConsumer autoAck(boolean value) { | |||
} | |||
|
|||
public String recover() throws IOException { | |||
this.consumerTag = this.channel.basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer); | |||
this.consumerTag = this.channel.getDelegate().basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the recorded entities besides these 2 use getDelegate(). I think these 2 should as well otherwise it ends up duplicating the RecorcedConsumer/Queue and readding it to the recorded entity collections.
* The list, which may not be {@code null} | ||
* @return ArrayList copy of the list | ||
*/ | ||
public static <E> List<E> copy(final List<E> list) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seemed like the appropriate place for these...
@vikinghawk Thanks! |
Should this be backported to |
@michaelklishin Yes, we can. #197 should also be backported then. |
Not sure on what the release dates for 3.6.x vs 4.0.0 are, but the sooner my team could get these changes in a released form the better. Thanks! |
We will backport. |
It's been backported to stable, so it will be in 3.6.6. |
This was back ported to |
This is a follow up to PR 197 and is the fix for issue 1 mentioned at https://groups.google.com/forum/#!topic/rabbitmq-users/7P5fYQLLP6g.
When a consumer is canceled the code was removing its auto-delete queue from RecordedQueues list, but it wasn't removing that queue's bindings. When recovery ran it tried recovering bindings for queues that did not exist and caused recovery failures. Line 788 of AutorecoveringConnection is the fix.
I also fixed several potential threading related issues with collections where it was possible to get ConcurrentModificationExceptions.