Skip to content

Commit 3f068cb

Browse files
committed
* Add CurrentQueue to IModel to keep track of the last declared queue name as defined in the AMQP 091 spec
* Fix `RecordedConsumer` to use `CurrentQueue` when passed in name is `string.Empty`
1 parent 5fc2dbc commit 3f068cb

File tree

5 files changed

+53
-9
lines changed

5 files changed

+53
-9
lines changed

projects/RabbitMQ.Client/client/api/IModel.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ public interface IModel : IDisposable
9797
/// </summary>
9898
ulong NextPublishSeqNo { get; }
9999

100+
/// <summary>
101+
/// The name of the last queue declared on this channel.
102+
/// </summary>
103+
/// <remarks>
104+
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
105+
/// </remarks>
106+
string CurrentQueue { get; }
107+
100108
/// <summary>
101109
/// Signalled when a Basic.Ack command arrives from the broker.
102110
/// </summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,19 @@ public ulong NextPublishSeqNo
395395
}
396396
}
397397

398+
public string CurrentQueue
399+
{
400+
get
401+
{
402+
if (_disposed)
403+
{
404+
throw new ObjectDisposedException(GetType().FullName);
405+
}
406+
407+
return _delegate.CurrentQueue;
408+
}
409+
}
410+
398411
public void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
399412
{
400413
if (_disposed)

projects/RabbitMQ.Client/client/impl/ModelBase.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ public bool IsOpen
170170

171171
public ulong NextPublishSeqNo { get; private set; }
172172

173+
public string CurrentQueue { get; private set; }
174+
173175
public ISession Session { get; private set; }
174176

175177
public Task Close(ushort replyCode, string replyText, bool abort)
@@ -1481,7 +1483,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
14811483
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
14821484
k.GetReply(ContinuationTimeout);
14831485
}
1484-
return k.m_result;
1486+
QueueDeclareOk result = k.m_result;
1487+
CurrentQueue = result.QueueName;
1488+
return result;
14851489
}
14861490

14871491

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,17 +47,20 @@ public RecordedConsumer(AutorecoveringModel model, string queue, string consumer
4747
Model = model;
4848
}
4949

50-
/*
51-
* NB: queue can be the empty string in the case of server-named queues
52-
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238
53-
*/
5450
if (queue is null)
5551
{
5652
throw new ArgumentNullException(nameof(queue));
5753
}
5854
else
5955
{
60-
Queue = queue;
56+
if (queue == string.Empty)
57+
{
58+
Queue = model.CurrentQueue;
59+
}
60+
else
61+
{
62+
Queue = queue;
63+
}
6164
}
6265

6366
if (string.IsNullOrEmpty(consumerTag))
@@ -80,7 +83,14 @@ public RecordedConsumer(AutorecoveringModel model, string queue, string consumer
8083

8184
public string Recover(IModel channelToUse)
8285
{
83-
ConsumerTag = channelToUse.BasicConsume(Queue, AutoAck,
86+
string queueName = Queue;
87+
if (string.IsNullOrEmpty(queueName))
88+
{
89+
// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
90+
queueName = channelToUse.CurrentQueue;
91+
}
92+
93+
ConsumerTag = channelToUse.BasicConsume(queueName, AutoAck,
8494
ConsumerTag, false, Exclusive,
8595
Arguments, Consumer);
8696

projects/Unit/TestConnectionRecovery.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,12 +424,21 @@ public void TestConsumerRecoveryWithServerNamedQueue()
424424
m.BasicConsume(string.Empty, true, cons);
425425
AssertConsumerCount(m, qname, 1);
426426

427+
bool queueNameBeforeIsEqual = false;
427428
bool queueNameChangeAfterRecoveryCalled = false;
428-
c.QueueNameChangeAfterRecovery += (source, ea) => { queueNameChangeAfterRecoveryCalled = true; };
429+
string qnameAfterRecovery = null;
430+
c.QueueNameChangeAfterRecovery += (source, ea) =>
431+
{
432+
queueNameChangeAfterRecoveryCalled = true;
433+
queueNameBeforeIsEqual = qname.Equals(ea.NameBefore);
434+
qnameAfterRecovery = ea.NameAfter;
435+
};
429436

430437
CloseAndWaitForRecovery(c);
431-
AssertConsumerCount(m, qname, 1);
438+
439+
AssertConsumerCount(m, qnameAfterRecovery, 1);
432440
Assert.True(queueNameChangeAfterRecoveryCalled);
441+
Assert.True(queueNameBeforeIsEqual);
433442
}
434443
}
435444

0 commit comments

Comments
 (0)