Skip to content

Commit 8cd6ffb

Browse files
authored
Merge pull request #1317 from rabbitmq/rabbitmq-dotnet-client-1302
Delete recorded consumers when autorecovering model is disposed
2 parents 7b8368b + bda44c9 commit 8cd6ffb

File tree

3 files changed

+70
-15
lines changed

3 files changed

+70
-15
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -612,13 +612,23 @@ public void RecordBinding(RecordedBinding rb)
612612
}
613613
}
614614

615-
public void RecordConsumer(string name, RecordedConsumer c)
615+
public void RecordConsumer(string consumerTag, RecordedConsumer recordedConsumer)
616616
{
617+
if (string.IsNullOrEmpty(consumerTag))
618+
{
619+
throw new ArgumentNullException(nameof(consumerTag));
620+
}
621+
622+
if (recordedConsumer is null)
623+
{
624+
throw new ArgumentNullException(nameof(recordedConsumer));
625+
}
626+
617627
lock (_recordedEntitiesLock)
618628
{
619-
if (!_recordedConsumers.ContainsKey(name))
629+
if (!_recordedConsumers.ContainsKey(consumerTag))
620630
{
621-
_recordedConsumers.Add(name, c);
631+
_recordedConsumers.Add(consumerTag, recordedConsumer);
622632
}
623633
}
624634
}
@@ -651,6 +661,14 @@ public override string ToString()
651661

652662
public void UnregisterModel(AutorecoveringModel model)
653663
{
664+
lock (_recordedEntitiesLock)
665+
{
666+
foreach (string ct in model.ConsumerTags)
667+
{
668+
DeleteRecordedConsumer(ct);
669+
}
670+
}
671+
654672
lock (_models)
655673
{
656674
_models.Remove(model);

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal sealed class AutorecoveringModel : IFullModel, IRecoverable
4343
private readonly object _eventLock = new object();
4444
private AutorecoveringConnection _connection;
4545
private RecoveryAwareModel _delegate;
46+
private List<string> _recordedConsumerTags = new List<string>();
4647

4748
private EventHandler<BasicAckEventArgs> _recordedBasicAckEventHandlers;
4849
private EventHandler<BasicNackEventArgs> _recordedBasicNackEventHandlers;
@@ -91,6 +92,8 @@ public TimeSpan ContinuationTimeout
9192
}
9293
}
9394

95+
public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
96+
9497
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
9598
{
9699
_connection = conn;
@@ -476,7 +479,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
476479

477480
try
478481
{
479-
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
482+
_delegate.Close(reason, abort).GetAwaiter().GetResult();
480483
}
481484
finally
482485
{
@@ -510,6 +513,8 @@ private void Dispose(bool disposing)
510513
{
511514
Abort();
512515

516+
_recordedConsumerTags.Clear();
517+
_recordedConsumerTags = null;
513518
_connection = null;
514519
_delegate = null;
515520
_recordedBasicAckEventHandlers = null;
@@ -1184,16 +1189,16 @@ public string BasicConsume(
11841189
throw new ObjectDisposedException(GetType().FullName);
11851190
}
11861191

1187-
string result = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
1192+
string resultConsumerTag = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
11881193
exclusive, arguments, consumer);
1189-
RecordedConsumer rc = new RecordedConsumer(this, queue).
1190-
WithConsumerTag(result).
1194+
RecordedConsumer rc = new RecordedConsumer(this, queue, resultConsumerTag).
11911195
WithConsumer(consumer).
11921196
WithExclusive(exclusive).
11931197
WithAutoAck(autoAck).
11941198
WithArguments(arguments);
1195-
_connection.RecordConsumer(result, rc);
1196-
return result;
1199+
_connection.RecordConsumer(resultConsumerTag, rc);
1200+
_recordedConsumerTags.Add(resultConsumerTag);
1201+
return resultConsumerTag;
11971202
}
11981203

11991204
public BasicGetResult BasicGet(string queue,

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,41 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Collections.Generic;
3334

3435
namespace RabbitMQ.Client.Impl
3536
{
3637
internal class RecordedConsumer : IRecordedConsumer
3738
{
38-
public RecordedConsumer(AutorecoveringModel model, string queue)
39+
public RecordedConsumer(AutorecoveringModel model, string queue, string consumerTag)
3940
{
40-
Model = model;
41-
Queue = queue;
41+
if (model == null)
42+
{
43+
throw new ArgumentNullException(nameof(model));
44+
}
45+
else
46+
{
47+
Model = model ?? throw new ArgumentNullException(nameof(model));
48+
}
49+
50+
if (string.IsNullOrEmpty(queue))
51+
{
52+
throw new ArgumentNullException(nameof(consumerTag));
53+
}
54+
else
55+
{
56+
Queue = queue;
57+
}
58+
59+
if (string.IsNullOrEmpty(consumerTag))
60+
{
61+
throw new ArgumentNullException(nameof(consumerTag));
62+
}
63+
else
64+
{
65+
ConsumerTag = consumerTag;
66+
}
4267
}
4368

4469
public AutorecoveringModel Model { get; }
@@ -72,13 +97,20 @@ public RecordedConsumer WithAutoAck(bool value)
7297

7398
public RecordedConsumer WithConsumer(IBasicConsumer value)
7499
{
75-
Consumer = value;
100+
Consumer = value ?? throw new System.ArgumentNullException(nameof(value));
76101
return this;
77102
}
78103

79104
public RecordedConsumer WithConsumerTag(string value)
80105
{
81-
ConsumerTag = value;
106+
if (string.IsNullOrEmpty(value))
107+
{
108+
throw new System.ArgumentNullException(nameof(value));
109+
}
110+
else
111+
{
112+
ConsumerTag = value;
113+
}
82114
return this;
83115
}
84116

@@ -90,7 +122,7 @@ public RecordedConsumer WithExclusive(bool value)
90122

91123
public RecordedConsumer WithQueue(string value)
92124
{
93-
Queue = value;
125+
Queue = value ?? throw new System.ArgumentNullException(nameof(value));
94126
return this;
95127
}
96128
}

0 commit comments

Comments
 (0)