Skip to content

Recovery exchanges and bindings #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
# run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
- name: Test
timeout-minutes: 15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent addition @Gsantomaggio, I didn't know that setting exists.

run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=${{github.workspace}}/rabbit.snk
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
- [x] Recovery publishers on connection lost
- [x] Recovery consumers on connection lost
- [x] Implement Environment to manage the connections
- [ ] Complete the consumer part with `pause` and `unpause`
- [x] Complete the consumer part with `pause` and `unpause`
- [ ] Complete the binding/unbinding with the special characters
- [ ] Complete the queues/exchanges name with the special characters
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
- [ ] Recovery exchanges on connection lost
- [ ] Recovery bindings on connection lost
- [x] Recovery exchanges on connection lost
- [x] Recovery bindings on connection lost
- [ ] Docker image to test in LRE [not mandatory]
- [ ] Check the TODO in the code

15 changes: 14 additions & 1 deletion RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,42 @@ public interface IExchangeSpecification : IEntitySpecification

IExchangeSpecification AutoDelete(bool autoDelete);

bool AutoDelete();

IExchangeSpecification Type(ExchangeType type);

IExchangeSpecification Type(string type); // TODO: Add this
ExchangeType Type();

IExchangeSpecification Argument(string key, object value);
Dictionary<string, object> Arguments();

IExchangeSpecification Arguments(Dictionary<string, object> arguments);
}

public interface IBindingSpecification
{
IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec);

IBindingSpecification SourceExchange(string exchangeName);
string SourceExchangeName();

IBindingSpecification DestinationQueue(IQueueSpecification queueSpec);
IBindingSpecification DestinationQueue(string queueName);
string DestinationQueueName();

IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec);
IBindingSpecification DestinationExchange(string exchangeName);
string DestinationExchangeName();

IBindingSpecification Key(string key);
string Key();

IBindingSpecification Argument(string key, object value);

IBindingSpecification Arguments(Dictionary<string, object> arguments);
Dictionary<string, object> Arguments();

string Path();

Task BindAsync();
Task UnbindAsync();
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ public interface IEnvironment
/// <param name="connectionSettings"></param>
/// <returns>IConnection</returns>
public Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings);


/// <summary>
/// Create a new connection with the default connection settings.
/// </summary>
/// <returns>IConnection</returns>

public Task<IConnection> CreateConnectionAsync();


Expand All @@ -30,6 +30,6 @@ public interface IEnvironment
/// Close all connections.
/// </summary>
/// <returns></returns>

Task CloseAsync();
}
13 changes: 13 additions & 0 deletions RabbitMQ.AMQP.Client/ITopologyListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,20 @@ public interface ITopologyListener

void QueueDeleted(string name);

void ExchangeDeclared(IExchangeSpecification specification);

void ExchangeDeleted(string name);


void BindingDeclared(IBindingSpecification specification);

void BindingDeleted(string path);

void Clear();

int QueueCount();

int ExchangeCount();

int BindingCount();
}
33 changes: 27 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public AmqpBindingSpecification(AmqpManagement management)
_management = management;
}

public Dictionary<string, object> Arguments()
{
return _arguments;
}

public string Path() => BindingsTarget();

public async Task BindAsync()
{
var kv = new Map
Expand All @@ -46,6 +53,7 @@ public async Task BindAsync()
string method = AmqpManagement.Post;
int[] expectedReturnCodes = [AmqpManagement.Code204];
// Note: must use await so that ConfigureAwait(false) can be called
_management.TopologyListener().BindingDeclared(this);
await _management.RequestAsync(kv, path, method, expectedReturnCodes)
.ConfigureAwait(false);
}
Expand All @@ -63,12 +71,13 @@ public async Task UnbindAsync()
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";

_management.TopologyListener().BindingDeleted(Path());
await _management.RequestAsync(null, path, method, expectedReturnCodes)
.ConfigureAwait(false);
}
else
{
string bindingsPath = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
string bindingsPath = BindingsTarget();
List<Map> bindings = await GetBindings(bindingsPath).ConfigureAwait(false);
string? path = MatchBinding(bindings, RoutingKey, ArgsToMap());
if (path is null)
Expand All @@ -77,6 +86,7 @@ await _management.RequestAsync(null, path, method, expectedReturnCodes)
}
else
{
_management.TopologyListener().BindingDeclared(this);
await _management.RequestAsync(null, path, method, expectedReturnCodes)
.ConfigureAwait(false);
}
Expand All @@ -95,6 +105,11 @@ public IBindingSpecification SourceExchange(string exchangeName)
return this;
}

public string SourceExchangeName()
{
return Source;
}

public IBindingSpecification DestinationQueue(IQueueSpecification queueSpec)
{
return DestinationQueue(queueSpec.Name());
Expand All @@ -107,6 +122,8 @@ public IBindingSpecification DestinationQueue(string queueName)
return this;
}

public string DestinationQueueName() => Destination;

public IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec)
{
return DestinationExchange(exchangeSpec.Name());
Expand All @@ -118,12 +135,16 @@ public IBindingSpecification DestinationExchange(string exchangeName)
return this;
}

public string DestinationExchangeName() => Destination;

public IBindingSpecification Key(string key)
{
RoutingKey = key;
return this;
}

public string Key() => RoutingKey;

public IBindingSpecification Argument(string key, object value)
{
_arguments[key] = value;
Expand All @@ -136,17 +157,17 @@ public IBindingSpecification Arguments(Dictionary<string, object> arguments)
return this;
}

private string BindingsTarget(
string destinationField, string source, string destination, string key)
private string BindingsTarget()
{
string destinationField = ToQueue ? "dstq" : "dste";
return "/bindings?src="
+ Utils.EncodeHttpParameter(source)
+ Utils.EncodeHttpParameter(Source)
+ "&"
+ destinationField
+ "="
+ Utils.EncodeHttpParameter(destination)
+ Utils.EncodeHttpParameter(Destination)
+ "&key="
+ Utils.EncodeHttpParameter(key);
+ Utils.EncodeHttpParameter(RoutingKey);
}

private async Task<List<Map>> GetBindings(string path)
Expand Down
40 changes: 40 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,44 @@ await Management.Queue(spec).DeclareAsync()
}
}
}


public async Task VisitExchangesAsync(IEnumerable<ExchangeSpec> exchangeSpec)
{
// TODO this could be done in parallel
foreach (ExchangeSpec spec in exchangeSpec)
{
Trace.WriteLine(TraceLevel.Information, $"Recovering exchange {spec.Name}");
try
{
await Management.Exchange(spec).DeclareAsync()
.ConfigureAwait(false);
}
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Error,
$"Error recovering exchange {spec.Name}. Error: {e}. Management Status: {Management}");
}
}
}


public async Task VisitBindingsAsync(IEnumerable<BindingSpec> bindingSpec)
{
// TODO this could be done in parallel
foreach (BindingSpec spec in bindingSpec)
{
Trace.WriteLine(TraceLevel.Information, $"Recovering binding {spec.Path}");
try
{
await Management.Binding(spec).BindAsync()
.ConfigureAwait(false);
}
catch (Exception e)
{
Trace.WriteLine(TraceLevel.Error,
$"Error recovering binding {spec.Path}. Error: {e}. Management Status: {Management}");
}
}
}
}
33 changes: 27 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpExchangeSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpe
private string _name = "";
private bool _autoDelete;
private ExchangeType _type = ExchangeType.DIRECT;
private string _typeString = ""; // TODO: add this
private readonly Map _arguments = new();

public Task DeclareAsync()
Expand All @@ -31,6 +30,7 @@ public Task DeclareAsync()
string path = $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}";
string method = AmqpManagement.Put;
int[] expectedResponseCodes = [AmqpManagement.Code204, AmqpManagement.Code201, AmqpManagement.Code409];
management.TopologyListener().ExchangeDeclared(this);
return management.RequestAsync(kv, path, method, expectedResponseCodes);
}

Expand All @@ -39,7 +39,7 @@ public Task DeleteAsync()
string path = $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}";
string method = AmqpManagement.Delete;
int[] expectedResponseCodes = [AmqpManagement.Code204];
// TODO management topology listener?
management.TopologyListener().ExchangeDeleted(_name);
return management.RequestAsync(null, path, method, expectedResponseCodes);
}

Expand All @@ -60,21 +60,42 @@ public IExchangeSpecification AutoDelete(bool autoDelete)
return this;
}

public bool AutoDelete() => _autoDelete;

public IExchangeSpecification Type(ExchangeType type)
{
_type = type;
return this;
}

public IExchangeSpecification Type(string type)
public ExchangeType Type() => _type;


public IExchangeSpecification Argument(string key, object value)
{
_typeString = type;
_arguments[key] = value;
return this;
}

public IExchangeSpecification Argument(string key, object value)
public Dictionary<string, object> Arguments()
{
_arguments[key] = value;
var result = new Dictionary<string, object>();

foreach ((object key, object value) in _arguments)
{
result[key.ToString() ?? throw new InvalidOperationException()] = value;
}
return result;
}


public IExchangeSpecification Arguments(Dictionary<string, object> arguments)
{
foreach ((object key, object value) in arguments)
{
_arguments[key] = value;
}

return this;
}
}
18 changes: 18 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,28 @@ public IExchangeSpecification Exchange(string name)
return Exchange().Name(name);
}

public IExchangeSpecification Exchange(ExchangeSpec spec)
{
return Exchange().Name(spec.Name)
.AutoDelete(spec.AutoDelete)
.Type(spec.Type)
.Arguments(spec.Arguments);
}

public IBindingSpecification Binding()
{
return new AmqpBindingSpecification(this);
}

public IBindingSpecification Binding(BindingSpec spec)
{
return Binding()
.SourceExchange(spec.SourceExchange)
.DestinationQueue(spec.DestinationQueue)
.DestinationExchange(spec.DestinationExchange)
.Key(spec.Key)
.Arguments(spec.Arguments);
}

public ITopologyListener TopologyListener()
{
Expand Down
Loading