Skip to content

Improve bindings recovery #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
- [x] Recovery consumers on connection lost
- [x] Implement Environment to manage the connections
- [x] Complete the consumer part with `pause` and `unpause`
- [ ] Complete the binding/unbinding with the special characters
- [ ] Complete the queues/exchanges name with the special characters
- [x] Complete the binding/unbinding with the special characters
- [x] Complete the queues/exchanges name with the special characters
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
- [x] Recovery exchanges on connection lost
- [x] Recovery bindings on connection lost
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public interface IExchangeSpecification : IEntitySpecification
public interface IBindingSpecification
{
IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec);

IBindingSpecification SourceExchange(string exchangeName);
string SourceExchangeName();

Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.AMQP.Client/ITopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ public interface ITopologyListener
void ExchangeDeclared(IExchangeSpecification specification);

void ExchangeDeleted(string name);


void BindingDeclared(IBindingSpecification specification);

void BindingDeleted(string path);

void Clear();

int QueueCount();

int ExchangeCount();

int BindingCount();
}
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,8 @@ await Management.Exchange(spec).DeclareAsync()
}
}
}


public async Task VisitBindingsAsync(IEnumerable<BindingSpec> bindingSpec)
{
// TODO this could be done in parallel
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public IBindingSpecification Binding()
{
return new AmqpBindingSpecification(this);
}

public IBindingSpecification Binding(BindingSpec spec)
{
return Binding()
Expand Down
25 changes: 25 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/RecordingTopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ public class RecordingTopologyListener : ITopologyListener

private readonly ConcurrentDictionary<string, BindingSpec> _bindingSpecifications = new();

private void RemoveBindingsSpecificationFromQueue(string queueName)
{
foreach (var binding in _bindingSpecifications.Values)
{
if (binding.DestinationQueue == queueName)
{
_bindingSpecifications.TryRemove(binding.Path, out _);
}
}
}

private void RemoveBindingsSpecificationFromExchange(string exchangeName)
{
foreach (var binding in _bindingSpecifications.Values)
{
if (binding.SourceExchange == exchangeName
|| binding.DestinationExchange == exchangeName)
{
_bindingSpecifications.TryRemove(binding.Path, out _);
}
}
}


public void QueueDeclared(IQueueSpecification specification)
{
Expand All @@ -33,6 +56,7 @@ public void QueueDeclared(IQueueSpecification specification)
public void QueueDeleted(string name)
{
_queueSpecifications.TryRemove(name, out _);
RemoveBindingsSpecificationFromQueue(name);
}

public void ExchangeDeclared(IExchangeSpecification specification)
Expand All @@ -43,6 +67,7 @@ public void ExchangeDeclared(IExchangeSpecification specification)
public void ExchangeDeleted(string name)
{
_exchangeSpecifications.TryRemove(name, out _);
RemoveBindingsSpecificationFromExchange(name);
}

public void BindingDeclared(IBindingSpecification specification)
Expand Down
46 changes: 27 additions & 19 deletions Tests/BindingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public async Task SimpleBindingsBetweenExchangeAndQueue(string sourceExchange, s

await bindingSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec, destinationQueueSpec);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
destinationQueueSpec);

/*
* TODO dispose assertions?
Expand Down Expand Up @@ -105,7 +106,12 @@ public async Task BindBetweenExchangeAndQueueTwoTimes()
"[7][8][9] 他被广泛认为是理论计算机科学和人工智能之父。")]
[InlineData("ήταν Άγγλος μαθηματικός, επιστήμονας υπολογιστών",
"ήταν Άγγλος μαθηματικός, επιστήμονας", "επι")]
public async Task SimpleBindingsBetweenExchangeAndExchange(string sourceExchangeName, string destinationExchangeName,
[InlineData("(~~~!!++@~./.,€€#!!±§##§¶¡€#¢)",
"~~~!!++@----.", "==`£!-=+")]


public async Task SimpleBindingsBetweenExchangeAndExchange(string sourceExchangeName,
string destinationExchangeName,
string key)
{
Assert.NotNull(_connection);
Expand All @@ -127,11 +133,13 @@ await WhenAllComplete(
await bindingSpecification.BindAsync();

await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec, destinationExchangeSpec);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
destinationExchangeSpec);

await bindingSpecification.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec, destinationExchangeSpec);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
destinationExchangeSpec);

await sourceExchangeSpec.DeleteAsync();
await destinationExchangeSpec.DeleteAsync();
Expand All @@ -146,6 +154,7 @@ await WhenAllComplete(
[InlineData("B", 10000L, "H", 0.0001)]
[InlineData("是英国", 10000.32, "W", 3.0001)]
[InlineData("是英国", "是英国23", "W", 3.0001)]
[InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", "~~~!!++@----", "==`£!-=+", "===£!-=+")]
public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(string key1, object value1,
string key2, object value2)
{
Expand Down Expand Up @@ -196,8 +205,7 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync
[Theory]
[InlineData("my_source_exchange_multi_123", "my_destination_789", "myKey")]
[InlineData("是英国v_", "destination_是英国v_", "μαθηματικός")]
// TODO: to validate. Atm it seems there is a server side problem
// [InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", ",,~~~!!++@----./.,€€#####§¶¡€#¢@@@", "===£!-=+")]
[InlineData("(~~~!!++@----./.,€€#####§¶¡€#¢)", ",,~~~!!++@----./.,€€#####§¶¡€#¢@@@", "===£!-=+")]
public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(string source,
string destination, string key)
{
Expand All @@ -210,7 +218,6 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
await WhenAllComplete(exchangeSpec.DeclareAsync(), queueSpec.DeclareAsync());

// add 10 bindings to have a list of bindings to find
var bindingSpecs = new List<IBindingSpecification>();
var bindingSpecTasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
Expand All @@ -219,31 +226,35 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
.DestinationQueue(queueSpec)
.Key(key) // single key to use different args
.Arguments(new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } });
bindingSpecs.Add(bindingSpec);
bindingSpecTasks.Add(bindingSpec.BindAsync());
}

await WhenAllComplete(bindingSpecTasks);
bindingSpecTasks.Clear();

var specialBindArgs = new Dictionary<string, object>() { { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L }, };
var specialBindArgs =
new Dictionary<string, object>() { { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L }, };
IBindingSpecification specialBindSpec = _management.Binding()
.SourceExchange(exchangeSpec)
.DestinationQueue(queueSpec)
.Key(key) // single key to use different args
.Arguments(specialBindArgs);
await specialBindSpec.BindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec, specialBindArgs);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
specialBindArgs);

await specialBindSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec, specialBindArgs);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
specialBindArgs);

for (int i = 0; i < 10; i++)
{
var bindArgs = new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } };

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec, bindArgs);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
bindArgs);

await _management.Binding()
.SourceExchange(exchangeSpec)
Expand All @@ -252,14 +263,11 @@ await _management.Binding()
.Arguments(bindArgs)
.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec, bindArgs);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
bindArgs);
}

/*
* NB: test DisposeAsync will do this.
await _management.ExchangeDeletion().Delete(source);
await _management.QueueDeletion().Delete(destination);
await exchangeSpec.DeleteAsync();
await queueSpec.DeleteAsync();
await _connection.CloseAsync();
*/
}
}
Loading