Skip to content

Commit f3382d3

Browse files
authored
Merge pull request #1324 from rabbitmq/rabbitmq-dotnet-client-1238
Fix consumer recovery with server-named queues
2 parents 95be329 + 7590feb commit f3382d3

File tree

7 files changed

+83
-31
lines changed

7 files changed

+83
-31
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ build/
5050
[Oo]bj/
5151
*.lock.json
5252

53-
APIApproval.Approve.received.txt
53+
projects/Unit/APIApproval.Approve.received.txt
54+
projects/Unit/APIApproval.Approve.*.received.txt
5455

5556
# Visual Studio 2015 cache/options directory
5657
.vs/

projects/RabbitMQ.Client/client/api/IModel.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ public interface IModel : IDisposable
9797
/// </summary>
9898
ulong NextPublishSeqNo { get; }
9999

100+
/// <summary>
101+
/// The name of the last queue declared on this channel.
102+
/// </summary>
103+
/// <remarks>
104+
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
105+
/// </remarks>
106+
string CurrentQueue { get; }
107+
100108
/// <summary>
101109
/// Signalled when a Basic.Ack command arrives from the broker.
102110
/// </summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,19 @@ public ulong NextPublishSeqNo
395395
}
396396
}
397397

398+
public string CurrentQueue
399+
{
400+
get
401+
{
402+
if (_disposed)
403+
{
404+
throw new ObjectDisposedException(GetType().FullName);
405+
}
406+
407+
return _delegate.CurrentQueue;
408+
}
409+
}
410+
398411
public void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
399412
{
400413
if (_disposed)

projects/RabbitMQ.Client/client/impl/ModelBase.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,8 @@ public bool IsOpen
170170

171171
public ulong NextPublishSeqNo { get; private set; }
172172

173+
public string CurrentQueue { get; private set; }
174+
173175
public ISession Session { get; private set; }
174176

175177
public Task Close(ushort replyCode, string replyText, bool abort)
@@ -1481,7 +1483,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
14811483
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
14821484
k.GetReply(ContinuationTimeout);
14831485
}
1484-
return k.m_result;
1486+
QueueDeclareOk result = k.m_result;
1487+
CurrentQueue = result.QueueName;
1488+
return result;
14851489
}
14861490

14871491

projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,33 @@ namespace RabbitMQ.Client.Impl
3636
{
3737
internal class RecordedConsumer : IRecordedConsumer
3838
{
39+
private readonly AutorecoveringModel _model;
40+
3941
public RecordedConsumer(AutorecoveringModel model, string queue, string consumerTag)
4042
{
41-
if (model == null)
43+
if (model is null)
4244
{
4345
throw new ArgumentNullException(nameof(model));
4446
}
4547
else
4648
{
47-
Model = model;
49+
_model = model;
4850
}
4951

50-
if (string.IsNullOrEmpty(queue))
52+
if (queue is null)
5153
{
52-
throw new ArgumentNullException(nameof(consumerTag));
54+
throw new ArgumentNullException(nameof(queue));
5355
}
5456
else
5557
{
56-
Queue = queue;
58+
if (queue == string.Empty)
59+
{
60+
Queue = _model.CurrentQueue;
61+
}
62+
else
63+
{
64+
Queue = queue;
65+
}
5766
}
5867

5968
if (string.IsNullOrEmpty(consumerTag))
@@ -66,20 +75,23 @@ public RecordedConsumer(AutorecoveringModel model, string queue, string consumer
6675
}
6776
}
6877

69-
public AutorecoveringModel Model { get; }
78+
public AutorecoveringModel Model
79+
{
80+
get { return _model; }
81+
}
82+
83+
public string Queue { get; set; }
84+
public string ConsumerTag { get; set; }
7085
public IDictionary<string, object> Arguments { get; set; }
7186
public bool AutoAck { get; set; }
72-
public IBasicConsumer Consumer { get; set; }
73-
public string ConsumerTag { get; set; }
7487
public bool Exclusive { get; set; }
75-
public string Queue { get; set; }
88+
public IBasicConsumer Consumer { get; set; }
7689

7790
public string Recover(IModel channelToUse)
7891
{
7992
ConsumerTag = channelToUse.BasicConsume(Queue, AutoAck,
8093
ConsumerTag, false, Exclusive,
8194
Arguments, Consumer);
82-
8395
return ConsumerTag;
8496
}
8597

@@ -101,29 +113,10 @@ public RecordedConsumer WithConsumer(IBasicConsumer value)
101113
return this;
102114
}
103115

104-
public RecordedConsumer WithConsumerTag(string value)
105-
{
106-
if (string.IsNullOrEmpty(value))
107-
{
108-
throw new System.ArgumentNullException(nameof(value));
109-
}
110-
else
111-
{
112-
ConsumerTag = value;
113-
}
114-
return this;
115-
}
116-
117116
public RecordedConsumer WithExclusive(bool value)
118117
{
119118
Exclusive = value;
120119
return this;
121120
}
122-
123-
public RecordedConsumer WithQueue(string value)
124-
{
125-
Queue = value ?? throw new System.ArgumentNullException(nameof(value));
126-
return this;
127-
}
128121
}
129122
}

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ namespace RabbitMQ.Client
385385
int ChannelNumber { get; }
386386
RabbitMQ.Client.ShutdownEventArgs CloseReason { get; }
387387
System.TimeSpan ContinuationTimeout { get; set; }
388+
string CurrentQueue { get; }
388389
RabbitMQ.Client.IBasicConsumer DefaultConsumer { get; set; }
389390
bool IsClosed { get; }
390391
bool IsOpen { get; }

projects/Unit/TestConnectionRecovery.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,38 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
410410
}
411411
}
412412

413+
[Test(Description = "https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238")]
414+
public void TestConsumerRecoveryWithServerNamedQueue()
415+
{
416+
using (AutorecoveringConnection c = CreateAutorecoveringConnection())
417+
{
418+
IModel m = c.CreateModel();
419+
QueueDeclareOk queueDeclareResult = m.QueueDeclare(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null);
420+
string qname = queueDeclareResult.QueueName;
421+
Assert.False(string.IsNullOrEmpty(qname));
422+
423+
var cons = new EventingBasicConsumer(m);
424+
m.BasicConsume(string.Empty, true, cons);
425+
AssertConsumerCount(m, qname, 1);
426+
427+
bool queueNameBeforeIsEqual = false;
428+
bool queueNameChangeAfterRecoveryCalled = false;
429+
string qnameAfterRecovery = null;
430+
c.QueueNameChangeAfterRecovery += (source, ea) =>
431+
{
432+
queueNameChangeAfterRecoveryCalled = true;
433+
queueNameBeforeIsEqual = qname.Equals(ea.NameBefore);
434+
qnameAfterRecovery = ea.NameAfter;
435+
};
436+
437+
CloseAndWaitForRecovery(c);
438+
439+
AssertConsumerCount(m, qnameAfterRecovery, 1);
440+
Assert.True(queueNameChangeAfterRecoveryCalled);
441+
Assert.True(queueNameBeforeIsEqual);
442+
}
443+
}
444+
413445
[Test]
414446
public void TestConsumerRecoveryWithManyConsumers()
415447
{

0 commit comments

Comments
 (0)