Skip to content

Commit 8d80556

Browse files
committed
Delete recorded consumers when autorecovering model is disposed
Fixes #1302
1 parent c351467 commit 8d80556

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -612,13 +612,23 @@ public void RecordBinding(RecordedBinding rb)
612612
}
613613
}
614614

615-
public void RecordConsumer(string name, RecordedConsumer c)
615+
public void RecordConsumer(string consumerTag, RecordedConsumer recordedConsumer)
616616
{
617+
if (string.IsNullOrEmpty(consumerTag))
618+
{
619+
throw new ArgumentNullException(nameof(consumerTag));
620+
}
621+
622+
if (recordedConsumer is null)
623+
{
624+
throw new ArgumentNullException(nameof(recordedConsumer));
625+
}
626+
617627
lock (_recordedEntitiesLock)
618628
{
619-
if (!_recordedConsumers.ContainsKey(name))
629+
if (!_recordedConsumers.ContainsKey(consumerTag))
620630
{
621-
_recordedConsumers.Add(name, c);
631+
_recordedConsumers.Add(consumerTag, recordedConsumer);
622632
}
623633
}
624634
}
@@ -651,6 +661,14 @@ public override string ToString()
651661

652662
public void UnregisterModel(AutorecoveringModel model)
653663
{
664+
lock (_recordedEntitiesLock)
665+
{
666+
foreach (string ct in model.ConsumerTags)
667+
{
668+
DeleteRecordedConsumer(ct);
669+
}
670+
}
671+
654672
lock (_models)
655673
{
656674
_models.Remove(model);

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal sealed class AutorecoveringModel : IFullModel, IRecoverable
4343
private readonly object _eventLock = new object();
4444
private AutorecoveringConnection _connection;
4545
private RecoveryAwareModel _delegate;
46+
private List<string> _recordedConsumerTags = new List<string>();
4647

4748
private EventHandler<BasicAckEventArgs> _recordedBasicAckEventHandlers;
4849
private EventHandler<BasicNackEventArgs> _recordedBasicNackEventHandlers;
@@ -91,6 +92,8 @@ public TimeSpan ContinuationTimeout
9192
}
9293
}
9394

95+
public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
96+
9497
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
9598
{
9699
_connection = conn;
@@ -476,7 +479,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
476479

477480
try
478481
{
479-
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
482+
_delegate.Close(reason, abort).GetAwaiter().GetResult(); ;
480483
}
481484
finally
482485
{
@@ -510,6 +513,8 @@ private void Dispose(bool disposing)
510513
{
511514
Abort();
512515

516+
_recordedConsumerTags.Clear();
517+
_recordedConsumerTags = null;
513518
_connection = null;
514519
_delegate = null;
515520
_recordedBasicAckEventHandlers = null;
@@ -1184,16 +1189,16 @@ public string BasicConsume(
11841189
throw new ObjectDisposedException(GetType().FullName);
11851190
}
11861191

1187-
string result = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
1192+
string resultConsumerTag = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
11881193
exclusive, arguments, consumer);
11891194
RecordedConsumer rc = new RecordedConsumer(this, queue).
1190-
WithConsumerTag(result).
11911195
WithConsumer(consumer).
11921196
WithExclusive(exclusive).
11931197
WithAutoAck(autoAck).
11941198
WithArguments(arguments);
1195-
_connection.RecordConsumer(result, rc);
1196-
return result;
1199+
_connection.RecordConsumer(resultConsumerTag, rc);
1200+
_recordedConsumerTags.Add(resultConsumerTag);
1201+
return resultConsumerTag;
11971202
}
11981203

11991204
public BasicGetResult BasicGet(string queue,

0 commit comments

Comments
 (0)