Skip to content

Commit a7eca2f

Browse files
committed
fixed issues with recovery bindings not being removed and made collections thread safe
1 parent 08e1bcc commit a7eca2f

File tree

9 files changed

+100
-61
lines changed

9 files changed

+100
-61
lines changed

src/main/java/com/rabbitmq/client/Connection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
140140
*
141141
* @throws IOException if an I/O problem is encountered
142142
*/
143+
@Override
143144
void close() throws IOException;
144145

145146
/**

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
2121
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
2222
import com.rabbitmq.utility.BlockingCell;
23+
import com.rabbitmq.utility.Utility;
24+
2325
import org.slf4j.Logger;
2426
import org.slf4j.LoggerFactory;
2527

@@ -57,7 +59,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
5759
private String id;
5860

5961
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
60-
new ArrayList<RecoveryCanBeginListener>();
62+
Collections.synchronizedList(new ArrayList<RecoveryCanBeginListener>());
6163

6264
/**
6365
* Retrieve a copy of the default table of client properties that
@@ -619,7 +621,7 @@ public void run() {
619621

620622
private void notifyRecoveryCanBeginListeners() {
621623
ShutdownSignalException sse = this.getCloseReason();
622-
for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) {
624+
for(RecoveryCanBeginListener fn : Utility.copy(this.recoveryCanBeginListeners)) {
623625
fn.recoveryCanBegin(sse);
624626
}
625627
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,7 @@ public void setDefaultConsumer(Consumer consumer) {
270270
* @param signal an exception signalling channel shutdown
271271
*/
272272
private void broadcastShutdownSignal(ShutdownSignalException signal) {
273-
Map<String, Consumer> snapshotConsumers;
274-
synchronized (_consumers) {
275-
snapshotConsumers = new HashMap<String, Consumer>(_consumers);
276-
}
277-
this.finishedShutdownFlag = this.dispatcher.handleShutdownSignal(snapshotConsumers, signal);
273+
this.finishedShutdownFlag = this.dispatcher.handleShutdownSignal(Utility.copy(_consumers), signal);
278274
}
279275

280276
/**
@@ -370,7 +366,7 @@ private void releaseChannel() {
370366
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
371367
return true;
372368
} else if (method instanceof Basic.RecoverOk) {
373-
for (Map.Entry<String, Consumer> entry : _consumers.entrySet()) {
369+
for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
374370
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
375371
}
376372
// Unlike all the other cases we still want this RecoverOk to

src/main/java/com/rabbitmq/client/impl/TruncatedInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public int available() throws IOException {
4242
}
4343

4444
@Override
45-
public void mark(int readlimit) {
45+
public synchronized void mark(int readlimit) {
4646
super.mark(readlimit);
4747
mark = counter;
4848
}
@@ -71,7 +71,7 @@ public int read(byte[] b, int off, int len) throws IOException {
7171
}
7272

7373
@Override
74-
public void reset() throws IOException {
74+
public synchronized void reset() throws IOException {
7575
super.reset();
7676
counter = mark;
7777
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616
package com.rabbitmq.client.impl.recovery;
1717

1818
import com.rabbitmq.client.*;
19+
import com.rabbitmq.utility.Utility;
1920

2021
import java.io.IOException;
21-
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.CopyOnWriteArrayList;
2425
import java.util.concurrent.TimeoutException;
2526

2627
/**
@@ -30,13 +31,13 @@
3031
* @since 3.3.0
3132
*/
3233
public class AutorecoveringChannel implements Channel, Recoverable {
33-
private RecoveryAwareChannelN delegate;
34-
private AutorecoveringConnection connection;
35-
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
36-
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
37-
private final List<ReturnListener> returnListeners = new ArrayList<ReturnListener>();
38-
private final List<ConfirmListener> confirmListeners = new ArrayList<ConfirmListener>();
39-
private final List<FlowListener> flowListeners = new ArrayList<FlowListener>();
34+
private volatile RecoveryAwareChannelN delegate;
35+
private volatile AutorecoveringConnection connection;
36+
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
37+
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>();
38+
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
39+
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
40+
private final List<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
4041
private int prefetchCountConsumer;
4142
private int prefetchCountGlobal;
4243
private boolean usesPublisherConfirms;
@@ -648,7 +649,7 @@ private void recoverState() throws IOException {
648649
}
649650

650651
private void notifyRecoveryListeners() {
651-
for (RecoveryListener f : this.recoveryListeners) {
652+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
652653
f.handleRecovery(this);
653654
}
654655
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import com.rabbitmq.client.impl.ConnectionParams;
2121
import com.rabbitmq.client.impl.FrameHandlerFactory;
2222
import com.rabbitmq.client.impl.NetworkConnection;
23+
import com.rabbitmq.utility.Utility;
2324

2425
import java.io.IOException;
2526
import java.net.InetAddress;
2627
import java.util.ArrayList;
2728
import java.util.Collection;
28-
import java.util.HashMap;
29+
import java.util.Collections;
2930
import java.util.HashSet;
3031
import java.util.Iterator;
32+
import java.util.LinkedHashMap;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.Set;
@@ -60,19 +62,19 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
6062
private final RecoveryAwareAMQConnectionFactory cf;
6163
private final Map<Integer, AutorecoveringChannel> channels;
6264
private final ConnectionParams params;
63-
private RecoveryAwareAMQConnection delegate;
65+
private volatile RecoveryAwareAMQConnection delegate;
6466

65-
private final List<ShutdownListener> shutdownHooks = new ArrayList<ShutdownListener>();
66-
private final List<RecoveryListener> recoveryListeners = new ArrayList<RecoveryListener>();
67-
private final List<BlockedListener> blockedListeners = new ArrayList<BlockedListener>();
67+
private final List<ShutdownListener> shutdownHooks = Collections.synchronizedList(new ArrayList<ShutdownListener>());
68+
private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<RecoveryListener>());
69+
private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<BlockedListener>());
6870

6971
// Records topology changes
70-
private final Map<String, RecordedQueue> recordedQueues = new ConcurrentHashMap<String, RecordedQueue>();
71-
private final List<RecordedBinding> recordedBindings = new ArrayList<RecordedBinding>();
72-
private final Map<String, RecordedExchange> recordedExchanges = new ConcurrentHashMap<String, RecordedExchange>();
73-
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
74-
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
75-
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
72+
private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
73+
private final List<RecordedBinding> recordedBindings = Collections.synchronizedList(new ArrayList<RecordedBinding>());
74+
private final Map<String, RecordedExchange> recordedExchanges = Collections.synchronizedMap(new LinkedHashMap<String, RecordedExchange>());
75+
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
76+
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
77+
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());
7678

7779
// Used to block connection recovery attempts after close() is invoked.
7880
private volatile boolean manuallyClosed = false;
@@ -136,10 +138,10 @@ public Channel createChannel(int channelNumber) throws IOException {
136138
* @return Recovering channel.
137139
*/
138140
private Channel wrapChannel(RecoveryAwareChannelN delegateChannel) {
139-
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
140141
if (delegateChannel == null) {
141142
return null;
142143
} else {
144+
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
143145
this.registerChannel(channel);
144146
return channel;
145147
}
@@ -514,13 +516,13 @@ synchronized private void beginAutomaticRecovery() throws InterruptedException {
514516
}
515517

516518
private void recoverShutdownListeners(final RecoveryAwareAMQConnection newConn) {
517-
for (ShutdownListener sh : this.shutdownHooks) {
519+
for (ShutdownListener sh : Utility.copy(this.shutdownHooks)) {
518520
newConn.addShutdownListener(sh);
519521
}
520522
}
521523

522524
private void recoverBlockedListeners(final RecoveryAwareAMQConnection newConn) {
523-
for (BlockedListener bl : this.blockedListeners) {
525+
for (BlockedListener bl : Utility.copy(this.blockedListeners)) {
524526
newConn.addBlockedListener(bl);
525527
}
526528
}
@@ -564,7 +566,7 @@ private void recoverChannels(final RecoveryAwareAMQConnection newConn) {
564566
}
565567

566568
private void notifyRecoveryListeners() {
567-
for (RecoveryListener f : this.recoveryListeners) {
569+
for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
568570
f.handleRecovery(this);
569571
}
570572
}
@@ -585,7 +587,7 @@ private void recoverExchanges() {
585587
// recorded exchanges are guaranteed to be
586588
// non-predefined (we filter out predefined ones
587589
// in exchangeDeclare). MK.
588-
for (RecordedExchange x : this.recordedExchanges.values()) {
590+
for (RecordedExchange x : Utility.copy(this.recordedExchanges).values()) {
589591
try {
590592
x.recover();
591593
} catch (Exception cause) {
@@ -598,8 +600,7 @@ private void recoverExchanges() {
598600
}
599601

600602
private void recoverQueues() {
601-
Map<String, RecordedQueue> copy = new HashMap<String, RecordedQueue>(this.recordedQueues);
602-
for (Map.Entry<String, RecordedQueue> entry : copy.entrySet()) {
603+
for (Map.Entry<String, RecordedQueue> entry : Utility.copy(this.recordedQueues).entrySet()) {
603604
String oldName = entry.getKey();
604605
RecordedQueue q = entry.getValue();
605606
try {
@@ -621,7 +622,7 @@ private void recoverQueues() {
621622
this.recordedQueues.put(newName, q);
622623
}
623624
}
624-
for(QueueRecoveryListener qrl : this.queueRecoveryListeners) {
625+
for(QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
625626
qrl.queueRecovered(oldName, newName);
626627
}
627628
} catch (Exception cause) {
@@ -634,7 +635,7 @@ private void recoverQueues() {
634635
}
635636

636637
private void recoverBindings() {
637-
for (RecordedBinding b : this.recordedBindings) {
638+
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
638639
try {
639640
b.recover();
640641
} catch (Exception cause) {
@@ -647,8 +648,7 @@ private void recoverBindings() {
647648
}
648649

649650
private void recoverConsumers() {
650-
Map<String, RecordedConsumer> copy = new HashMap<String, RecordedConsumer>(this.consumers);
651-
for (Map.Entry<String, RecordedConsumer> entry : copy.entrySet()) {
651+
for (Map.Entry<String, RecordedConsumer> entry : Utility.copy(this.consumers).entrySet()) {
652652
String tag = entry.getKey();
653653
RecordedConsumer consumer = entry.getValue();
654654

@@ -659,7 +659,7 @@ private void recoverConsumers() {
659659
this.consumers.remove(tag);
660660
this.consumers.put(newTag, consumer);
661661
}
662-
for(ConsumerRecoveryListener crl : this.consumerRecoveryListeners) {
662+
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
663663
crl.consumerRecovered(tag, newTag);
664664
}
665665
} catch (Exception cause) {
@@ -672,22 +672,22 @@ private void recoverConsumers() {
672672
}
673673

674674
private void propagateQueueNameChangeToBindings(String oldName, String newName) {
675-
for (RecordedBinding b : this.recordedBindings) {
675+
for (RecordedBinding b : Utility.copy(this.recordedBindings)) {
676676
if (b.getDestination().equals(oldName)) {
677677
b.setDestination(newName);
678678
}
679679
}
680680
}
681681

682682
private void propagateQueueNameChangeToConsumers(String oldName, String newName) {
683-
for (RecordedConsumer c : this.consumers.values()) {
683+
for (RecordedConsumer c : Utility.copy(this.consumers).values()) {
684684
if (c.getQueue().equals(oldName)) {
685685
c.setQueue(newName);
686686
}
687687
}
688688
}
689689

690-
synchronized void recordQueueBinding(AutorecoveringChannel ch,
690+
void recordQueueBinding(AutorecoveringChannel ch,
691691
String queue,
692692
String exchange,
693693
String routingKey,
@@ -701,7 +701,7 @@ synchronized void recordQueueBinding(AutorecoveringChannel ch,
701701
this.recordedBindings.add(binding);
702702
}
703703

704-
synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
704+
boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
705705
String queue,
706706
String exchange,
707707
String routingKey,
@@ -714,7 +714,7 @@ synchronized boolean deleteRecordedQueueBinding(AutorecoveringChannel ch,
714714
return this.recordedBindings.remove(b);
715715
}
716716

717-
synchronized void recordExchangeBinding(AutorecoveringChannel ch,
717+
void recordExchangeBinding(AutorecoveringChannel ch,
718718
String destination,
719719
String source,
720720
String routingKey,
@@ -728,7 +728,7 @@ synchronized void recordExchangeBinding(AutorecoveringChannel ch,
728728
this.recordedBindings.add(binding);
729729
}
730730

731-
synchronized boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch,
731+
boolean deleteRecordedExchangeBinding(AutorecoveringChannel ch,
732732
String destination,
733733
String source,
734734
String routingKey,
@@ -784,7 +784,9 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
784784
RecordedQueue q = this.recordedQueues.get(queue);
785785
// last consumer on this connection is gone, remove recorded queue
786786
// if it is auto-deleted. See bug 26364.
787-
if((q != null) && q.isAutoDelete()) { this.recordedQueues.remove(queue); }
787+
if((q != null) && q.isAutoDelete()) {
788+
deleteRecordedQueue(queue);
789+
}
788790
}
789791
}
790792
}
@@ -793,11 +795,13 @@ void maybeDeleteRecordedAutoDeleteQueue(String queue) {
793795
void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
794796
synchronized (this.recordedExchanges) {
795797
synchronized (this.consumers) {
796-
if(!hasMoreDestinationsBoundToExchange(this.recordedBindings, exchange)) {
798+
if(!hasMoreDestinationsBoundToExchange(Utility.copy(this.recordedBindings), exchange)) {
797799
RecordedExchange x = this.recordedExchanges.get(exchange);
798800
// last binding where this exchange is the source is gone, remove recorded exchange
799801
// if it is auto-deleted. See bug 26364.
800-
if((x != null) && x.isAutoDelete()) { this.recordedExchanges.remove(exchange); }
802+
if((x != null) && x.isAutoDelete()) {
803+
this.recordedExchanges.remove(exchange);
804+
}
801805
}
802806
}
803807
}
@@ -825,13 +829,15 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
825829
return result;
826830
}
827831

828-
synchronized Set<RecordedBinding> removeBindingsWithDestination(String s) {
829-
Set<RecordedBinding> result = new HashSet<RecordedBinding>();
830-
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
831-
RecordedBinding b = it.next();
832-
if(b.getDestination().equals(s)) {
833-
it.remove();
834-
result.add(b);
832+
Set<RecordedBinding> removeBindingsWithDestination(String s) {
833+
final Set<RecordedBinding> result = new HashSet<RecordedBinding>();
834+
synchronized (this.recordedBindings) {
835+
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
836+
RecordedBinding b = it.next();
837+
if(b.getDestination().equals(s)) {
838+
it.remove();
839+
result.add(b);
840+
}
835841
}
836842
}
837843
return result;

src/main/java/com/rabbitmq/client/impl/recovery/RecordedConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public RecordedConsumer autoAck(boolean value) {
5757
}
5858

5959
public String recover() throws IOException {
60-
this.consumerTag = this.channel.basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
60+
this.consumerTag = this.channel.getDelegate().basicConsume(this.queue, this.autoAck, this.consumerTag, false, this.exclusive, this.arguments, this.consumer);
6161
return this.consumerTag;
6262
}
6363

src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public boolean isServerNamed() {
5050
public boolean isAutoDelete() { return this.autoDelete; }
5151

5252
public void recover() throws IOException {
53-
this.name = this.channel.queueDeclare(this.getNameToUseForRecovery(),
53+
this.name = this.channel.getDelegate().queueDeclare(this.getNameToUseForRecovery(),
5454
this.durable,
5555
this.exclusive,
5656
this.autoDelete,

0 commit comments

Comments
 (0)