Skip to content

Delete recorded consumers when autorecovering model is disposed #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -612,13 +612,23 @@ public void RecordBinding(RecordedBinding rb)
}
}

public void RecordConsumer(string name, RecordedConsumer c)
public void RecordConsumer(string consumerTag, RecordedConsumer recordedConsumer)
{
if (string.IsNullOrEmpty(consumerTag))
{
throw new ArgumentNullException(nameof(consumerTag));
}

if (recordedConsumer is null)
{
throw new ArgumentNullException(nameof(recordedConsumer));
}

lock (_recordedEntitiesLock)
{
if (!_recordedConsumers.ContainsKey(name))
if (!_recordedConsumers.ContainsKey(consumerTag))
{
_recordedConsumers.Add(name, c);
_recordedConsumers.Add(consumerTag, recordedConsumer);
}
}
}
Expand Down Expand Up @@ -651,6 +661,14 @@ public override string ToString()

public void UnregisterModel(AutorecoveringModel model)
{
lock (_recordedEntitiesLock)
{
foreach (string ct in model.ConsumerTags)
{
DeleteRecordedConsumer(ct);
}
}

lock (_models)
{
_models.Remove(model);
Expand Down
17 changes: 11 additions & 6 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ internal sealed class AutorecoveringModel : IFullModel, IRecoverable
private readonly object _eventLock = new object();
private AutorecoveringConnection _connection;
private RecoveryAwareModel _delegate;
private List<string> _recordedConsumerTags = new List<string>();

private EventHandler<BasicAckEventArgs> _recordedBasicAckEventHandlers;
private EventHandler<BasicNackEventArgs> _recordedBasicNackEventHandlers;
Expand Down Expand Up @@ -91,6 +92,8 @@ public TimeSpan ContinuationTimeout
}
}

public IEnumerable<string> ConsumerTags => _recordedConsumerTags;

public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
{
_connection = conn;
Expand Down Expand Up @@ -476,7 +479,7 @@ public void Close(ShutdownEventArgs reason, bool abort)

try
{
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
_delegate.Close(reason, abort).GetAwaiter().GetResult();
}
finally
{
Expand Down Expand Up @@ -510,6 +513,8 @@ private void Dispose(bool disposing)
{
Abort();

_recordedConsumerTags.Clear();
_recordedConsumerTags = null;
_connection = null;
_delegate = null;
_recordedBasicAckEventHandlers = null;
Expand Down Expand Up @@ -1184,16 +1189,16 @@ public string BasicConsume(
throw new ObjectDisposedException(GetType().FullName);
}

string result = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
string resultConsumerTag = _delegate.BasicConsume(queue, autoAck, consumerTag, noLocal,
exclusive, arguments, consumer);
RecordedConsumer rc = new RecordedConsumer(this, queue).
WithConsumerTag(result).
RecordedConsumer rc = new RecordedConsumer(this, queue, resultConsumerTag).
WithConsumer(consumer).
WithExclusive(exclusive).
WithAutoAck(autoAck).
WithArguments(arguments);
_connection.RecordConsumer(result, rc);
return result;
_connection.RecordConsumer(resultConsumerTag, rc);
_recordedConsumerTags.Add(resultConsumerTag);
return resultConsumerTag;
}

public BasicGetResult BasicGet(string queue,
Expand Down
44 changes: 38 additions & 6 deletions projects/RabbitMQ.Client/client/impl/RecordedConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,41 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace RabbitMQ.Client.Impl
{
internal class RecordedConsumer : IRecordedConsumer
{
public RecordedConsumer(AutorecoveringModel model, string queue)
public RecordedConsumer(AutorecoveringModel model, string queue, string consumerTag)
{
Model = model;
Queue = queue;
if (model == null)
{
throw new ArgumentNullException(nameof(model));
}
else
{
Model = model ?? throw new ArgumentNullException(nameof(model));
}

if (string.IsNullOrEmpty(queue))
{
throw new ArgumentNullException(nameof(consumerTag));
}
else
{
Queue = queue;
}

if (string.IsNullOrEmpty(consumerTag))
{
throw new ArgumentNullException(nameof(consumerTag));
}
else
{
ConsumerTag = consumerTag;
}
}

public AutorecoveringModel Model { get; }
Expand Down Expand Up @@ -72,13 +97,20 @@ public RecordedConsumer WithAutoAck(bool value)

public RecordedConsumer WithConsumer(IBasicConsumer value)
{
Consumer = value;
Consumer = value ?? throw new System.ArgumentNullException(nameof(value));
return this;
}

public RecordedConsumer WithConsumerTag(string value)
{
ConsumerTag = value;
if (string.IsNullOrEmpty(value))
{
throw new System.ArgumentNullException(nameof(value));
}
else
{
ConsumerTag = value;
}
return this;
}

Expand All @@ -90,7 +122,7 @@ public RecordedConsumer WithExclusive(bool value)

public RecordedConsumer WithQueue(string value)
{
Queue = value;
Queue = value ?? throw new System.ArgumentNullException(nameof(value));
return this;
}
}
Expand Down