Skip to content

Commit a9fe878

Browse files
authored
Merge pull request #1325 from rabbitmq/rabbitmq-dotnet-client-1238-main
Fix consumer recovery with server-named queues
2 parents 2b06f48 + b438e8e commit a9fe878

File tree

8 files changed

+123
-13
lines changed

8 files changed

+123
-13
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ build/
5151

5252
BenchmarkDotNet.Artifacts/*
5353

54-
APIApproval.Approve.received.txt
54+
projects/Unit/APIApproval.Approve.received.txt
55+
projects/Unit/APIApproval.Approve.*.received.txt
5556

5657
# Visual Studio 2015 cache/options directory
5758
.vs/

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ public interface IChannel : IDisposable
9797
/// </summary>
9898
ulong NextPublishSeqNo { get; }
9999

100+
/// <summary>
101+
/// The name of the last queue declared on this channel.
102+
/// </summary>
103+
/// <remarks>
104+
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
105+
/// </remarks>
106+
string CurrentQueue { get; }
107+
100108
/// <summary>
101109
/// Signalled when a Basic.Ack command arrives from the broker.
102110
/// </summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,23 +123,76 @@ public event EventHandler<EventArgs> Recovery
123123
remove { InnerChannel.Recovery -= value; }
124124
}
125125

126-
public IEnumerable<string> ConsumerTags => _recordedConsumerTags;
126+
public IEnumerable<string> ConsumerTags
127+
{
128+
get
129+
{
130+
ThrowIfDisposed();
131+
return _recordedConsumerTags;
132+
}
133+
}
127134

128-
public int ChannelNumber => InnerChannel.ChannelNumber;
135+
public int ChannelNumber
136+
{
137+
get
138+
{
139+
ThrowIfDisposed();
140+
return InnerChannel.ChannelNumber;
141+
}
142+
}
129143

130-
public ShutdownEventArgs CloseReason => InnerChannel.CloseReason;
144+
public ShutdownEventArgs CloseReason
145+
{
146+
get
147+
{
148+
ThrowIfDisposed();
149+
return InnerChannel.CloseReason;
150+
}
151+
}
131152

132153
public IBasicConsumer DefaultConsumer
133154
{
134-
get => InnerChannel.DefaultConsumer;
135-
set => InnerChannel.DefaultConsumer = value;
155+
get
156+
{
157+
ThrowIfDisposed();
158+
return InnerChannel.DefaultConsumer;
159+
}
160+
161+
set
162+
{
163+
ThrowIfDisposed();
164+
InnerChannel.DefaultConsumer = value;
165+
}
136166
}
137167

138168
public bool IsClosed => !IsOpen;
139169

140-
public bool IsOpen => _innerChannel != null && _innerChannel.IsOpen;
170+
public bool IsOpen
171+
{
172+
get
173+
{
174+
ThrowIfDisposed();
175+
return _innerChannel != null && _innerChannel.IsOpen;
176+
}
177+
}
178+
179+
public ulong NextPublishSeqNo
180+
{
181+
get
182+
{
183+
ThrowIfDisposed();
184+
return InnerChannel.NextPublishSeqNo;
185+
}
186+
}
141187

142-
public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;
188+
public string CurrentQueue
189+
{
190+
get
191+
{
192+
ThrowIfDisposed();
193+
return InnerChannel.CurrentQueue;
194+
}
195+
}
143196

144197
internal void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
145198
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private void UpdateConsumerQueue(string oldName, string newName)
152152
{
153153
if (consumer.Queue == oldName)
154154
{
155-
_recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueNameTag(newName, consumer);
155+
_recordedConsumers[consumer.ConsumerTag] = RecordedConsumer.WithNewQueueName(newName, consumer);
156156
}
157157
}
158158
}

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,8 @@ public IBasicConsumer DefaultConsumer
175175

176176
public ulong NextPublishSeqNo { get; private set; }
177177

178+
public string CurrentQueue { get; private set; }
179+
178180
public ISession Session { get; private set; }
179181

180182
protected void TakeOver(ChannelBase other)
@@ -1169,7 +1171,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
11691171
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
11701172
k.GetReply(ContinuationTimeout);
11711173
}
1172-
return k.m_result;
1174+
QueueDeclareOk result = k.m_result;
1175+
CurrentQueue = result.QueueName;
1176+
return result;
11731177
}
11741178

11751179

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,21 @@ public RecordedConsumer(AutorecoveringChannel channel, IBasicConsumer consumer,
5959
}
6060
_consumer = consumer;
6161

62-
if (string.IsNullOrEmpty(queue))
62+
if (queue is null)
6363
{
6464
throw new ArgumentNullException(nameof(queue));
6565
}
66-
_queue = queue;
66+
else
67+
{
68+
if (queue == string.Empty)
69+
{
70+
_queue = _channel.CurrentQueue;
71+
}
72+
else
73+
{
74+
_queue = queue;
75+
}
76+
}
6777

6878
if (string.IsNullOrEmpty(consumerTag))
6979
{
@@ -89,7 +99,7 @@ public static RecordedConsumer WithNewConsumerTag(string newTag, in RecordedCons
8999
return new RecordedConsumer(old.Channel, old.Consumer, newTag, old.Queue, old.AutoAck, old.Exclusive, old.Arguments);
90100
}
91101

92-
public static RecordedConsumer WithNewQueueNameTag(string newQueueName, in RecordedConsumer old)
102+
public static RecordedConsumer WithNewQueueName(string newQueueName, in RecordedConsumer old)
93103
{
94104
return new RecordedConsumer(old.Channel, old.Consumer, old.ConsumerTag, newQueueName, old.AutoAck, old.Exclusive, old.Arguments);
95105
}

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,7 @@ namespace RabbitMQ.Client
380380
int ChannelNumber { get; }
381381
RabbitMQ.Client.ShutdownEventArgs CloseReason { get; }
382382
System.TimeSpan ContinuationTimeout { get; set; }
383+
string CurrentQueue { get; }
383384
RabbitMQ.Client.IBasicConsumer DefaultConsumer { get; set; }
384385
bool IsClosed { get; }
385386
bool IsOpen { get; }

projects/Unit/TestConnectionRecovery.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,39 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
410410
}
411411
}
412412

413+
[Fact]
414+
public void TestConsumerRecoveryWithServerNamedQueue()
415+
{
416+
// https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238
417+
using (AutorecoveringConnection c = CreateAutorecoveringConnection())
418+
{
419+
IChannel ch = c.CreateChannel();
420+
QueueDeclareOk queueDeclareResult = ch.QueueDeclare(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null);
421+
string qname = queueDeclareResult.QueueName;
422+
Assert.False(string.IsNullOrEmpty(qname));
423+
424+
var cons = new EventingBasicConsumer(ch);
425+
ch.BasicConsume(string.Empty, true, cons);
426+
AssertConsumerCount(ch, qname, 1);
427+
428+
bool queueNameBeforeIsEqual = false;
429+
bool queueNameChangeAfterRecoveryCalled = false;
430+
string qnameAfterRecovery = null;
431+
c.QueueNameChangeAfterRecovery += (source, ea) =>
432+
{
433+
queueNameChangeAfterRecoveryCalled = true;
434+
queueNameBeforeIsEqual = qname.Equals(ea.NameBefore);
435+
qnameAfterRecovery = ea.NameAfter;
436+
};
437+
438+
CloseAndWaitForRecovery(c);
439+
440+
AssertConsumerCount(ch, qnameAfterRecovery, 1);
441+
Assert.True(queueNameChangeAfterRecoveryCalled);
442+
Assert.True(queueNameBeforeIsEqual);
443+
}
444+
}
445+
413446
[Fact]
414447
public void TestConsumerRecoveryWithManyConsumers()
415448
{

0 commit comments

Comments
 (0)