Skip to content

Fix consumer recovery with server-named queues #1324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ build/
[Oo]bj/
*.lock.json

APIApproval.Approve.received.txt
projects/Unit/APIApproval.Approve.received.txt
projects/Unit/APIApproval.Approve.*.received.txt

# Visual Studio 2015 cache/options directory
.vs/
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ public interface IModel : IDisposable
/// </summary>
ulong NextPublishSeqNo { get; }

/// <summary>
/// The name of the last queue declared on this channel.
/// </summary>
/// <remarks>
/// https://www.rabbitmq.com/amqp-0-9-1-reference.html#domain.queue-name
/// </remarks>
string CurrentQueue { get; }

/// <summary>
/// Signalled when a Basic.Ack command arrives from the broker.
/// </summary>
Expand Down
13 changes: 13 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,19 @@ public ulong NextPublishSeqNo
}
}

public string CurrentQueue
{
get
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

return _delegate.CurrentQueue;
}
}

public void AutomaticallyRecover(AutorecoveringConnection conn, bool recoverConsumers)
{
if (_disposed)
Expand Down
6 changes: 5 additions & 1 deletion projects/RabbitMQ.Client/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public bool IsOpen

public ulong NextPublishSeqNo { get; private set; }

public string CurrentQueue { get; private set; }

public ISession Session { get; private set; }

public Task Close(ushort replyCode, string replyText, bool abort)
Expand Down Expand Up @@ -1481,7 +1483,9 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
k.GetReply(ContinuationTimeout);
}
return k.m_result;
QueueDeclareOk result = k.m_result;
CurrentQueue = result.QueueName;
return result;
}


Expand Down
51 changes: 22 additions & 29 deletions projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,33 @@ namespace RabbitMQ.Client.Impl
{
internal class RecordedConsumer : IRecordedConsumer
{
private readonly AutorecoveringModel _model;

public RecordedConsumer(AutorecoveringModel model, string queue, string consumerTag)
{
if (model == null)
if (model is null)
{
throw new ArgumentNullException(nameof(model));
}
else
{
Model = model;
_model = model;
}

if (string.IsNullOrEmpty(queue))
if (queue is null)
{
throw new ArgumentNullException(nameof(consumerTag));
throw new ArgumentNullException(nameof(queue));
}
else
{
Queue = queue;
if (queue == string.Empty)
{
Queue = _model.CurrentQueue;
}
else
{
Queue = queue;
}
}

if (string.IsNullOrEmpty(consumerTag))
Expand All @@ -66,20 +75,23 @@ public RecordedConsumer(AutorecoveringModel model, string queue, string consumer
}
}

public AutorecoveringModel Model { get; }
public AutorecoveringModel Model
{
get { return _model; }
}

public string Queue { get; set; }
public string ConsumerTag { get; set; }
public IDictionary<string, object> Arguments { get; set; }
public bool AutoAck { get; set; }
public IBasicConsumer Consumer { get; set; }
public string ConsumerTag { get; set; }
public bool Exclusive { get; set; }
public string Queue { get; set; }
public IBasicConsumer Consumer { get; set; }

public string Recover(IModel channelToUse)
{
ConsumerTag = channelToUse.BasicConsume(Queue, AutoAck,
ConsumerTag, false, Exclusive,
Arguments, Consumer);

return ConsumerTag;
}

Expand All @@ -101,29 +113,10 @@ public RecordedConsumer WithConsumer(IBasicConsumer value)
return this;
}

public RecordedConsumer WithConsumerTag(string value)
{
if (string.IsNullOrEmpty(value))
{
throw new System.ArgumentNullException(nameof(value));
}
else
{
ConsumerTag = value;
}
return this;
}

public RecordedConsumer WithExclusive(bool value)
{
Exclusive = value;
return this;
}

public RecordedConsumer WithQueue(string value)
{
Queue = value ?? throw new System.ArgumentNullException(nameof(value));
return this;
}
}
}
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ namespace RabbitMQ.Client
int ChannelNumber { get; }
RabbitMQ.Client.ShutdownEventArgs CloseReason { get; }
System.TimeSpan ContinuationTimeout { get; set; }
string CurrentQueue { get; }
RabbitMQ.Client.IBasicConsumer DefaultConsumer { get; set; }
bool IsClosed { get; }
bool IsOpen { get; }
Expand Down
32 changes: 32 additions & 0 deletions projects/Unit/TestConnectionRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,38 @@ public void TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
}
}

[Test(Description = "https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1238")]
public void TestConsumerRecoveryWithServerNamedQueue()
{
using (AutorecoveringConnection c = CreateAutorecoveringConnection())
{
IModel m = c.CreateModel();
QueueDeclareOk queueDeclareResult = m.QueueDeclare(queue: string.Empty, durable: false, exclusive: true, autoDelete: true, arguments: null);
string qname = queueDeclareResult.QueueName;
Assert.False(string.IsNullOrEmpty(qname));

var cons = new EventingBasicConsumer(m);
m.BasicConsume(string.Empty, true, cons);
AssertConsumerCount(m, qname, 1);

bool queueNameBeforeIsEqual = false;
bool queueNameChangeAfterRecoveryCalled = false;
string qnameAfterRecovery = null;
c.QueueNameChangeAfterRecovery += (source, ea) =>
{
queueNameChangeAfterRecoveryCalled = true;
queueNameBeforeIsEqual = qname.Equals(ea.NameBefore);
qnameAfterRecovery = ea.NameAfter;
};

CloseAndWaitForRecovery(c);

AssertConsumerCount(m, qnameAfterRecovery, 1);
Assert.True(queueNameChangeAfterRecoveryCalled);
Assert.True(queueNameBeforeIsEqual);
}
}

[Test]
public void TestConsumerRecoveryWithManyConsumers()
{
Expand Down