Skip to content

Fix for QUEUE_NOT_FOUND exceptions during recovery. #199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
*
* @throws IOException if an I/O problem is encountered
*/
@Override
void close() throws IOException;

/**
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
import com.rabbitmq.utility.BlockingCell;
import com.rabbitmq.utility.Utility;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,7 +59,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
private String id;

private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
new ArrayList<RecoveryCanBeginListener>();
Collections.synchronizedList(new ArrayList<RecoveryCanBeginListener>());

/**
* Retrieve a copy of the default table of client properties that
Expand Down Expand Up @@ -619,7 +621,7 @@ public void run() {

private void notifyRecoveryCanBeginListeners() {
ShutdownSignalException sse = this.getCloseReason();
for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) {
for(RecoveryCanBeginListener fn : Utility.copy(this.recoveryCanBeginListeners)) {
fn.recoveryCanBegin(sse);
}
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,7 @@ public void setDefaultConsumer(Consumer consumer) {
* @param signal an exception signalling channel shutdown
*/
private void broadcastShutdownSignal(ShutdownSignalException signal) {
Map<String, Consumer> snapshotConsumers;
synchronized (_consumers) {
snapshotConsumers = new HashMap<String, Consumer>(_consumers);
}
this.finishedShutdownFlag = this.dispatcher.handleShutdownSignal(snapshotConsumers, signal);
this.finishedShutdownFlag = this.dispatcher.handleShutdownSignal(Utility.copy(_consumers), signal);
}

/**
Expand Down Expand Up @@ -370,7 +366,7 @@ private void releaseChannel() {
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
for (Map.Entry<String, Consumer> entry : _consumers.entrySet()) {
for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
}
// Unlike all the other cases we still want this RecoverOk to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public int available() throws IOException {
}

@Override
public void mark(int readlimit) {
public synchronized void mark(int readlimit) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the super mark/reset methods are synchronized, this is addressing a findbug warning

super.mark(readlimit);
mark = counter;
}
Expand Down Expand Up @@ -71,7 +71,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public void reset() throws IOException {
public synchronized void reset() throws IOException {
super.reset();
counter = mark;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;

/**
Expand All @@ -30,13 +30,13 @@
* @since 3.3.0
*/
public class AutorecoveringChannel implements Channel, Recoverable {
private RecoveryAwareChannelN delegate;
private AutorecoveringConnection connection;
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
private final List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
private final List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
private final List<FlowListener> flowListeners = new ArrayList<FlowListener>();
private volatile RecoveryAwareChannelN delegate;
private volatile AutorecoveringConnection connection;
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChannelN was already using CopyOnWriteArrayList for these so followed same pattern here.

private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>();
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
private final List<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
private int prefetchCountConsumer;
private int prefetchCountGlobal;
private boolean usesPublisherConfirms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.impl.FrameHandlerFactory;
import com.rabbitmq.client.impl.NetworkConnection;
import com.rabbitmq.utility.Utility;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -60,19 +62,19 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
private final RecoveryAwareAMQConnectionFactory cf;
private final Map<Integer, AutorecoveringChannel> channels;
private final ConnectionParams params;
private RecoveryAwareAMQConnection delegate;
private volatile RecoveryAwareAMQConnection delegate;

private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
private final List<BlockedListener> blockedListeners = new ArrayList<BlockedListener>();
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>());
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentHashMap is perfectly thread-safe, but there were a couple places below where the code was synchronizing on the concurenthashmap to prevent access to it. That won't work unless the entire map itself is synchronized so switching to synchronized hash maps here. A copy is created of the map anytime it is iterated over now to make iteration thread safe.

// Records topology changes
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>();
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
private final Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());

// Used to block connection recovery attempts after close() is invoked.
private volatile boolean manuallyClosed = false;
Expand Down Expand Up @@ -136,10 +138,10 @@ public Channel createChannel(int channelNumber) throws IOException {
* @return Recovering channel.
*/
private Channel wrapChannel(RecoveryAwareChannelN delegateChannel) {
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
if (delegateChannel == null) {
return null;
} else {
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
this.registerChannel(channel);
return channel;
}
Expand Down Expand Up @@ -514,13 +516,13 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException {
}

private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) {
for (ShutdownListener sh : this.shutdownHooks) {
for (ShutdownListener sh : Utility.copy(this.shutdownHooks)) {
newConn.addShutdownListener(sh);
}
}

private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) {
for (BlockedListener bl : this.blockedListeners) {
for (BlockedListener bl : Utility.copy(this.blockedListeners)) {
newConn.addBlockedListener(bl);
}
}
Expand Down Expand Up @@ -564,7 +566,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
}

private void notifyRecoveryListeners() {
for (RecoveryListener f : this.recoveryListeners) {
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
f.handleRecovery(this);
}
}
Expand All @@ -585,7 +587,7 @@ private void recoverExchanges() {
// recorded exchanges are guaranteed to be
// non-predefined (we filter out predefined ones
// in exchangeDeclare). MK.
for (RecordedExchange x : this.recordedExchanges.values()) {
for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) {
try {
x.recover();
} catch (Exception cause) {
Expand All @@ -598,8 +600,7 @@ private void recoverExchanges() {
}

private void recoverQueues() {
Map<String, RecordedQueue> copy = new HashMap<String, RecordedQueue>(this.recordedQueues);
for (Map.Entry<String, RecordedQueue> entry : copy.entrySet()) {
for (Map.Entry<String, RecordedQueue> entry : Utility.copy(this.recordedQueues).entrySet()) {
String oldName = entry.getKey();
RecordedQueue q = entry.getValue();
try {
Expand All @@ -621,7 +622,7 @@ private void recoverQueues() {
this.recordedQueues.put(newName, q);
}
}
for(QueueRecoveryListener qrl : this.queueRecoveryListeners) {
for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
} catch (Exception cause) {
Expand All @@ -634,7 +635,7 @@ private void recoverQueues() {
}

private void recoverBindings() {
for (RecordedBinding b : this.recordedBindings) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
try {
b.recover();
} catch (Exception cause) {
Expand All @@ -647,8 +648,7 @@ private void recoverBindings() {
}

private void recoverConsumers() {
Map<String, RecordedConsumer> copy = new HashMap<String, RecordedConsumer>(this.consumers);
for (Map.Entry<String, RecordedConsumer> entry : copy.entrySet()) {
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) {
String tag = entry.getKey();
RecordedConsumer consumer = entry.getValue();

Expand All @@ -659,7 +659,7 @@ private void recoverConsumers() {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
for(ConsumerRecoveryListener crl : this.consumerRecoveryListeners) {
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
} catch (Exception cause) {
Expand All @@ -672,22 +672,22 @@ private void recoverConsumers() {
}

private void propagateQueueNameChangeToBindings(String oldName, String newName) {
for (RecordedBinding b : this.recordedBindings) {
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
if (b.getDestination().equals(oldName)) {
b.setDestination(newName);
}
}
}

private void propagateQueueNameChangeToConsumers(String oldName, String newName) {
for (RecordedConsumer c : this.consumers.values()) {
for (RecordedConsumer c : Utility.copy(this.consumers).values()) {
if (c.getQueue().equals(oldName)) {
c.setQueue(newName);
}
}
}

synchronized void recordQueueBinding(AutorecoveringChannel ch,
void recordQueueBinding(AutorecoveringChannel ch,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized was on these next 4 methods in what appears to be an effort to protect recordedBindings. This is now handled by wrapping it in Collections.synchronized so removing the class level synchronizations here.

String queue,
String exchange,
String routingKey,
Expand All @@ -701,7 +701,7 @@ synchronized void recordQueueBinding(AutorecoveringChannel ch,
this.recordedBindings.add(binding);
}

synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
String queue,
String exchange,
String routingKey,
Expand All @@ -714,7 +714,7 @@ synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
return this.recordedBindings.remove(b);
}

synchronized void recordExchangeBinding(AutorecoveringChannel ch,
void recordExchangeBinding(AutorecoveringChannel ch,
String destination,
String source,
String routingKey,
Expand All @@ -728,7 +728,7 @@ synchronized void recordExchangeBinding(AutorecoveringChannel ch,
this.recordedBindings.add(binding);
}

synchronized boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch,
boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch,
String destination,
String source,
String routingKey,
Expand Down Expand Up @@ -784,7 +784,9 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
RecordedQueue q = this.recordedQueues.get(queue);
// last consumer on this connection is gone, remove recorded queue
// if it is auto-deleted. See bug 26364.
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); }
if((q != null) && q.isAutoDelete()) {
deleteRecordedQueue(queue);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the fix. This method already handles the logic of removing the recorded queue and it's bindings.

}
}
}
}
Expand All @@ -793,11 +795,13 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
synchronized (this.recordedExchanges) {
synchronized (this.consumers) {
if(!hasMoreDestinationsBoundToExchange(this.recordedBindings, exchange)) {
if(!hasMoreDestinationsBoundToExchange(Utility.copy(this.recordedBindings), exchange)) {
RecordedExchange x = this.recordedExchanges.get(exchange);
// last binding where this exchange is the source is gone, remove recorded exchange
// if it is auto-deleted. See bug 26364.
if((x != null) && x.isAutoDelete()) { this.recordedExchanges.remove(exchange); }
if((x != null) && x.isAutoDelete()) {
this.recordedExchanges.remove(exchange);
}
}
}
}
Expand Down Expand Up @@ -825,13 +829,15 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
return result;
}

synchronized Set<RecordedBinding> removeBindingsWithDestination(String s) {
Set<RecordedBinding> result = new HashSet<RecordedBinding>();
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
RecordedBinding b = it.next();
if(b.getDestination().equals(s)) {
it.remove();
result.add(b);
Set<RecordedBinding> removeBindingsWithDestination(String s) {
final Set<RecordedBinding> result = new HashSet<RecordedBinding>();
synchronized (this.recordedBindings) {
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
RecordedBinding b = it.next();
if(b.getDestination().equals(s)) {
it.remove();
result.add(b);
}
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public RecordedConsumer autoAck(boolean value) {
}

public String recover() throws IOException {
this.consumerTag = this.channel.basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
this.consumerTag = this.channel.getDelegate().basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
Copy link
Contributor Author

@vikinghawk vikinghawk Sep 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the recorded entities besides these 2 use getDelegate(). I think these 2 should as well otherwise it ends up duplicating the RecorcedConsumer/Queue and readding it to the recorded entity collections.

return this.consumerTag;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean isServerNamed() {
public boolean isAutoDelete() { return this.autoDelete; }

public void recover() throws IOException {
this.name = this.channel.queueDeclare(this.getNameToUseForRecovery(),
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
this.durable,
this.exclusive,
this.autoDelete,
Expand Down
Loading