Skip to content

Fix for QUEUE_NOT_FOUND exceptions during recovery. #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2016
Merged

Fix for QUEUE_NOT_FOUND exceptions during recovery. #199

merged 3 commits into from
Sep 19, 2016

Conversation

vikinghawk
Copy link
Contributor

@vikinghawk vikinghawk commented Sep 18, 2016

This is a follow up to PR 197 and is the fix for issue 1 mentioned at https://groups.google.com/forum/#!topic/rabbitmq-users/7P5fYQLLP6g.

When a consumer is canceled the code was removing its auto-delete queue from RecordedQueues list, but it wasn't removing that queue's bindings. When recovery ran it tried recovering bindings for queues that did not exist and caused recovery failures. Line 788 of AutorecoveringConnection is the fix.

I also fixed several potential threading related issues with collections where it was possible to get ConcurrentModificationExceptions.

@@ -42,7 +42,7 @@ public int available() throws IOException {
}

@Override
public void mark(int readlimit) {
public synchronized void mark(int readlimit) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the super mark/reset methods are synchronized, this is addressing a findbug warning

private final List<FlowListener> flowListeners = new ArrayList<FlowListener>();
private volatile RecoveryAwareChannelN delegate;
private volatile AutorecoveringConnection connection;
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChannelN was already using CopyOnWriteArrayList for these so followed same pattern here.


// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentHashMap is perfectly thread-safe, but there were a couple places below where the code was synchronizing on the concurenthashmap to prevent access to it. That won't work unless the entire map itself is synchronized so switching to synchronized hash maps here. A copy is created of the map anytime it is iterated over now to make iteration thread safe.

if (c.getQueue().equals(oldName)) {
c.setQueue(newName);
}
}
}

synchronized void recordQueueBinding(AutorecoveringChannel ch,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized was on these next 4 methods in what appears to be an effort to protect recordedBindings. This is now handled by wrapping it in Collections.synchronized so removing the class level synchronizations here.

@@ -784,7 +784,9 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
RecordedQueue q = this.recordedQueues.get(queue);
// last consumer on this connection is gone, remove recorded queue
// if it is auto-deleted. See bug 26364.
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); }
if((q != null) && q.isAutoDelete()) {
deleteRecordedQueue(queue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the fix. This method already handles the logic of removing the recorded queue and it's bindings.

@@ -57,7 +57,7 @@ public RecordedConsumer autoAck(boolean value) {
}

public String recover() throws IOException {
this.consumerTag = this.channel.basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
this.consumerTag = this.channel.getDelegate().basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
Copy link
Contributor Author

@vikinghawk vikinghawk Sep 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the recorded entities besides these 2 use getDelegate(). I think these 2 should as well otherwise it ends up duplicating the RecorcedConsumer/Queue and readding it to the recorded entity collections.

* The list, which may not be {@code null}
* @return ArrayList copy of the list
*/
public static <E> List<E> copy(final List<E> list) {
Copy link
Contributor Author

@vikinghawk vikinghawk Sep 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seemed like the appropriate place for these...

@acogoluegnes acogoluegnes merged commit fd3a6c5 into rabbitmq:master Sep 19, 2016
@acogoluegnes
Copy link
Contributor

@vikinghawk Thanks!

@michaelklishin
Copy link
Contributor

Should this be backported to 3.6.x?

@acogoluegnes
Copy link
Contributor

@michaelklishin Yes, we can. #197 should also be backported then.

@vikinghawk
Copy link
Contributor Author

Not sure on what the release dates for 3.6.x vs 4.0.0 are, but the sooner my team could get these changes in a released form the better. Thanks!

@michaelklishin
Copy link
Contributor

We will backport.

@acogoluegnes
Copy link
Contributor

It's been backported to stable, so it will be in 3.6.6.

@michaelklishin michaelklishin modified the milestones: 3.6.6, 4.0.0 Oct 27, 2016
@michaelklishin
Copy link
Contributor

This was back ported to 3.6.6 => changing milestone.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants