Skip to content

Commit 2b06f48

Browse files
authored
Merge pull request #1323 from rabbitmq/rabbitmq-dotnet-client-1302-main
Port #1317 to main
2 parents c158281 + 36233ad commit 2b06f48

File tree

3 files changed

+65
-20
lines changed

3 files changed

+65
-20
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
4545
private AutorecoveringConnection _connection;
4646
private RecoveryAwareChannel _innerChannel;
4747
private bool _disposed;
48+
private List<string> _recordedConsumerTags = new List<string>();
4849

4950
private ushort _prefetchCountConsumer;
5051
private ushort _prefetchCountGlobal;
@@ -122,6 +123,8 @@ public event EventHandler<EventArgs> Recovery
122123
remove { InnerChannel.Recovery -= value; }
123124
}
124125

126+
public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
127+
125128
public int ChannelNumber => InnerChannel.ChannelNumber;
126129

127130
public ShutdownEventArgs CloseReason => InnerChannel.CloseReason;
@@ -207,6 +210,8 @@ public void Dispose()
207210

208211
this.Abort();
209212

213+
_recordedConsumerTags.Clear();
214+
_recordedConsumerTags = null;
210215
_connection = null;
211216
_innerChannel = null;
212217
_disposed = true;
@@ -238,9 +243,12 @@ public string BasicConsume(
238243
IDictionary<string, object> arguments,
239244
IBasicConsumer consumer)
240245
{
241-
string result = InnerChannel.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
242-
_connection.RecordConsumer(new RecordedConsumer(this, consumer, queue, autoAck, result, exclusive, arguments));
243-
return result;
246+
string resultConsumerTag = InnerChannel.BasicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer);
247+
var rc = new RecordedConsumer(channel: this, consumer: consumer, consumerTag: resultConsumerTag,
248+
queue: queue, autoAck: autoAck, exclusive: exclusive, arguments: arguments);
249+
_connection.RecordConsumer(rc);
250+
_recordedConsumerTags.Add(resultConsumerTag);
251+
return resultConsumerTag;
244252
}
245253

246254
public BasicGetResult BasicGet(string queue, bool autoAck)

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,14 @@ private void RecordChannel(AutorecoveringChannel m)
248248

249249
internal void DeleteRecordedChannel(AutorecoveringChannel channel)
250250
{
251+
lock (_recordedEntitiesLock)
252+
{
253+
foreach (string ct in channel.ConsumerTags)
254+
{
255+
DeleteRecordedConsumer(ct);
256+
}
257+
}
258+
251259
lock (_channels)
252260
{
253261
_channels.Remove(channel);

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,40 +29,69 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Collections.Generic;
3334

3435
namespace RabbitMQ.Client.Impl
3536
{
3637
#nullable enable
3738
internal readonly struct RecordedConsumer : IRecordedConsumer
3839
{
39-
public AutorecoveringChannel Channel { get; }
40-
public IBasicConsumer Consumer { get; }
41-
public string Queue { get; }
42-
public bool AutoAck { get; }
43-
public string ConsumerTag { get; }
44-
public bool Exclusive { get; }
45-
public IDictionary<string, object>? Arguments { get; }
40+
private readonly AutorecoveringChannel _channel;
41+
private readonly IBasicConsumer _consumer;
42+
private readonly string _queue;
43+
private readonly bool _autoAck;
44+
private readonly string _consumerTag;
45+
private readonly bool _exclusive;
46+
private readonly IDictionary<string, object>? _arguments;
4647

47-
public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, string queue, bool autoAck, string consumerTag, bool exclusive, IDictionary<string, object>? arguments)
48+
public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer, string consumerTag, string queue, bool autoAck, bool exclusive, IDictionary<string, object>? arguments)
4849
{
49-
Channel = channel;
50-
Consumer = consumer;
51-
Queue = queue;
52-
AutoAck = autoAck;
53-
ConsumerTag = consumerTag;
54-
Exclusive = exclusive;
55-
Arguments = arguments;
50+
if (channel == null)
51+
{
52+
throw new ArgumentNullException(nameof(channel));
53+
}
54+
_channel = channel;
55+
56+
if (consumer == null)
57+
{
58+
throw new ArgumentNullException(nameof(consumer));
59+
}
60+
_consumer = consumer;
61+
62+
if (string.IsNullOrEmpty(queue))
63+
{
64+
throw new ArgumentNullException(nameof(queue));
65+
}
66+
_queue = queue;
67+
68+
if (string.IsNullOrEmpty(consumerTag))
69+
{
70+
throw new ArgumentNullException(nameof(consumerTag));
71+
}
72+
_consumerTag = consumerTag;
73+
74+
_autoAck = autoAck;
75+
_exclusive = exclusive;
76+
_arguments = arguments;
5677
}
5778

79+
public AutorecoveringChannel Channel => _channel;
80+
public IBasicConsumer Consumer => _consumer;
81+
public string Queue => _queue;
82+
public bool AutoAck => _autoAck;
83+
public string ConsumerTag => _consumerTag;
84+
public bool Exclusive => _exclusive;
85+
public IDictionary<string, object>? Arguments => _arguments;
86+
5887
public static RecordedConsumer WithNewConsumerTag(string newTag, in RecordedConsumer old)
5988
{
60-
return new RecordedConsumer(old.Channel, old.Consumer, old.Queue, old.AutoAck, newTag, old.Exclusive, old.Arguments);
89+
return new RecordedConsumer(old.Channel, old.Consumer, newTag, old.Queue, old.AutoAck, old.Exclusive, old.Arguments);
6190
}
6291

6392
public static RecordedConsumer WithNewQueueNameTag(string newQueueName, in RecordedConsumer old)
6493
{
65-
return new RecordedConsumer(old.Channel, old.Consumer, newQueueName, old.AutoAck, old.ConsumerTag, old.Exclusive, old.Arguments);
94+
return new RecordedConsumer(old.Channel, old.Consumer, old.ConsumerTag, newQueueName, old.AutoAck, old.Exclusive, old.Arguments);
6695
}
6796

6897
public string Recover(IChannel channel)

0 commit comments

Comments
 (0)