Skip to content

Commit fdd0fa4

Browse files
committed
Delete recorded consumers when autorecovering model is disposed
Fixes #1302 Update dependencies Ensure RecordedConsumer has a ConsumerTag
1 parent c351467 commit fdd0fa4

File tree

5 files changed

+80
-23
lines changed

5 files changed

+80
-23
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -612,13 +612,23 @@ public void RecordBinding(RecordedBinding rb)
612612
}
613613
}
614614

615-
public void RecordConsumer(string name, RecordedConsumer c)
615+
public void RecordConsumer(string consumerTag, RecordedConsumer recordedConsumer)
616616
{
617+
if (string.IsNullOrEmpty(consumerTag))
618+
{
619+
throw new ArgumentNullException(nameof(consumerTag));
620+
}
621+
622+
if (recordedConsumer is null)
623+
{
624+
throw new ArgumentNullException(nameof(recordedConsumer));
625+
}
626+
617627
lock (_recordedEntitiesLock)
618628
{
619-
if (!_recordedConsumers.ContainsKey(name))
629+
if (!_recordedConsumers.ContainsKey(consumerTag))
620630
{
621-
_recordedConsumers.Add(name, c);
631+
_recordedConsumers.Add(consumerTag, recordedConsumer);
622632
}
623633
}
624634
}
@@ -651,6 +661,14 @@ public override string ToString()
651661

652662
public void UnregisterModel(AutorecoveringModel model)
653663
{
664+
lock (_recordedEntitiesLock)
665+
{
666+
foreach (string ct in model.ConsumerTags)
667+
{
668+
DeleteRecordedConsumer(ct);
669+
}
670+
}
671+
654672
lock (_models)
655673
{
656674
_models.Remove(model);

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ internal sealed class AutorecoveringModel : IFullModel, IRecoverable
4343
private readonly object _eventLock = new object();
4444
private AutorecoveringConnection _connection;
4545
private RecoveryAwareModel _delegate;
46+
private List<string> _recordedConsumerTags = new List<string>();
4647

4748
private EventHandler<BasicAckEventArgs> _recordedBasicAckEventHandlers;
4849
private EventHandler<BasicNackEventArgs> _recordedBasicNackEventHandlers;
@@ -91,6 +92,8 @@ public TimeSpan ContinuationTimeout
9192
}
9293
}
9394

95+
public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
96+
9497
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
9598
{
9699
_connection = conn;
@@ -476,7 +479,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
476479

477480
try
478481
{
479-
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
482+
_delegate.Close(reason, abort).GetAwaiter().GetResult();
480483
}
481484
finally
482485
{
@@ -510,6 +513,8 @@ private void Dispose(bool disposing)
510513
{
511514
Abort();
512515

516+
_recordedConsumerTags.Clear();
517+
_recordedConsumerTags = null;
513518
_connection = null;
514519
_delegate = null;
515520
_recordedBasicAckEventHandlers = null;
@@ -1184,16 +1189,16 @@ public string BasicConsume(
11841189
throw new ObjectDisposedException(GetType().FullName);
11851190
}
11861191

1187-
string result = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
1192+
string resultConsumerTag = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
11881193
exclusive, arguments, consumer);
1189-
RecordedConsumer rc = new RecordedConsumer(this, queue).
1190-
WithConsumerTag(result).
1194+
RecordedConsumer rc = new RecordedConsumer(this, queue, resultConsumerTag).
11911195
WithConsumer(consumer).
11921196
WithExclusive(exclusive).
11931197
WithAutoAck(autoAck).
11941198
WithArguments(arguments);
1195-
_connection.RecordConsumer(result, rc);
1196-
return result;
1199+
_connection.RecordConsumer(resultConsumerTag, rc);
1200+
_recordedConsumerTags.Add(resultConsumerTag);
1201+
return resultConsumerTag;
11971202
}
11981203

11991204
public BasicGetResult BasicGet(string queue,

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,41 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Collections.Generic;
3334

3435
namespace RabbitMQ.Client.Impl
3536
{
3637
internal class RecordedConsumer
3738
{
38-
public RecordedConsumer(AutorecoveringModel model, string queue)
39+
public RecordedConsumer(AutorecoveringModel model, string queue, string consumerTag)
3940
{
40-
Model = model;
41-
Queue = queue;
41+
if (model == null)
42+
{
43+
throw new ArgumentNullException(nameof(model));
44+
}
45+
else
46+
{
47+
Model = model ?? throw new ArgumentNullException(nameof(model));
48+
}
49+
50+
if (string.IsNullOrEmpty(queue))
51+
{
52+
throw new ArgumentNullException(nameof(consumerTag));
53+
}
54+
else
55+
{
56+
Queue = queue;
57+
}
58+
59+
if (string.IsNullOrEmpty(consumerTag))
60+
{
61+
throw new ArgumentNullException(nameof(consumerTag));
62+
}
63+
else
64+
{
65+
ConsumerTag = consumerTag;
66+
}
4267
}
4368

4469
public AutorecoveringModel Model { get; }
@@ -72,13 +97,20 @@ public RecordedConsumer WithAutoAck(bool value)
7297

7398
public RecordedConsumer WithConsumer(IBasicConsumer value)
7499
{
75-
Consumer = value;
100+
Consumer = value ?? throw new System.ArgumentNullException(nameof(value));
76101
return this;
77102
}
78103

79104
public RecordedConsumer WithConsumerTag(string value)
80105
{
81-
ConsumerTag = value;
106+
if (string.IsNullOrEmpty(value))
107+
{
108+
throw new System.ArgumentNullException(nameof(value));
109+
}
110+
else
111+
{
112+
ConsumerTag = value;
113+
}
82114
return this;
83115
}
84116

@@ -90,7 +122,7 @@ public RecordedConsumer WithExclusive(bool value)
90122

91123
public RecordedConsumer WithQueue(string value)
92124
{
93-
Queue = value;
125+
Queue = value ?? throw new System.ArgumentNullException(nameof(value));
94126
return this;
95127
}
96128
}

projects/Unit/TestConnectionRecovery.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Collections;
3433
using System.Collections.Generic;
3534
using System.Threading;
3635

@@ -825,7 +824,7 @@ public void TestUnbindQueueAfterRecoveryConnection()
825824
Assert.IsTrue(nameBefore.StartsWith("amq."));
826825
Assert.IsTrue(nameAfter.StartsWith("amq."));
827826
Assert.AreNotEqual(nameBefore, nameAfter);
828-
Model.QueueUnbind(nameAfter,x,"");
827+
Model.QueueUnbind(nameAfter, x, "");
829828
Model.QueueDeleteNoWait(nameAfter);
830829
latch.Reset();
831830
CloseAndWaitForRecovery();

projects/Unit/Unit.csproj

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,15 @@
1111
</ItemGroup>
1212

1313
<ItemGroup>
14-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.1" />
15-
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.2" />
16-
<PackageReference Include="NUnit" Version="3.13.2" />
17-
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />
18-
<PackageReference Include="PublicApiGenerator" Version="10.3.0" />
19-
<PackageReference Include="Verify.NUnit" Version="12.2" />
14+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
15+
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3">
16+
<PrivateAssets>all</PrivateAssets>
17+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
18+
</PackageReference>
19+
<PackageReference Include="NUnit" Version="3.13.3" />
20+
<PackageReference Include="NUnit3TestAdapter" Version="4.4.2" />
21+
<PackageReference Include="PublicApiGenerator" Version="11.0.0" />
22+
<PackageReference Include="Verify.NUnit" Version="19.11.2" />
2023
</ItemGroup>
2124

2225
</Project>

0 commit comments

Comments
 (0)