Skip to content

Commit 8f80cb2

Browse files
authored
Recovery exchanges and bindings (#46)
* Recovery exchanges and bindings when the connection is restored --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 46c9789 commit 8f80cb2

14 files changed

+419
-25
lines changed

.github/workflows/build-test.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030
# run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
3131
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
3232
- name: Test
33+
timeout-minutes: 15
3334
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=${{github.workspace}}/rabbit.snk
3435
- name: Check for errors in RabbitMQ logs
3536
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
3030
- [x] Recovery publishers on connection lost
3131
- [x] Recovery consumers on connection lost
3232
- [x] Implement Environment to manage the connections
33-
- [ ] Complete the consumer part with `pause` and `unpause`
33+
- [x] Complete the consumer part with `pause` and `unpause`
3434
- [ ] Complete the binding/unbinding with the special characters
3535
- [ ] Complete the queues/exchanges name with the special characters
3636
- [ ] Implement metrics ( See `System.Diagnostics.DiagnosticSource` [Link](https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation) )
37-
- [ ] Recovery exchanges on connection lost
38-
- [ ] Recovery bindings on connection lost
37+
- [x] Recovery exchanges on connection lost
38+
- [x] Recovery bindings on connection lost
3939
- [ ] Docker image to test in LRE [not mandatory]
4040
- [ ] Check the TODO in the code
4141

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,29 +137,42 @@ public interface IExchangeSpecification : IEntitySpecification
137137

138138
IExchangeSpecification AutoDelete(bool autoDelete);
139139

140+
bool AutoDelete();
141+
140142
IExchangeSpecification Type(ExchangeType type);
141143

142-
IExchangeSpecification Type(string type); // TODO: Add this
144+
ExchangeType Type();
143145

144146
IExchangeSpecification Argument(string key, object value);
147+
Dictionary<string, object> Arguments();
148+
149+
IExchangeSpecification Arguments(Dictionary<string, object> arguments);
145150
}
146151

147152
public interface IBindingSpecification
148153
{
149154
IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec);
155+
150156
IBindingSpecification SourceExchange(string exchangeName);
157+
string SourceExchangeName();
151158

152159
IBindingSpecification DestinationQueue(IQueueSpecification queueSpec);
153160
IBindingSpecification DestinationQueue(string queueName);
161+
string DestinationQueueName();
154162

155163
IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec);
156164
IBindingSpecification DestinationExchange(string exchangeName);
165+
string DestinationExchangeName();
157166

158167
IBindingSpecification Key(string key);
168+
string Key();
159169

160170
IBindingSpecification Argument(string key, object value);
161171

162172
IBindingSpecification Arguments(Dictionary<string, object> arguments);
173+
Dictionary<string, object> Arguments();
174+
175+
string Path();
163176

164177
Task BindAsync();
165178
Task UnbindAsync();

RabbitMQ.AMQP.Client/IEnvironment.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ public interface IEnvironment
1414
/// <param name="connectionSettings"></param>
1515
/// <returns>IConnection</returns>
1616
public Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings);
17-
18-
17+
18+
1919
/// <summary>
2020
/// Create a new connection with the default connection settings.
2121
/// </summary>
2222
/// <returns>IConnection</returns>
23-
23+
2424
public Task<IConnection> CreateConnectionAsync();
2525

2626

@@ -30,6 +30,6 @@ public interface IEnvironment
3030
/// Close all connections.
3131
/// </summary>
3232
/// <returns></returns>
33-
33+
3434
Task CloseAsync();
3535
}

RabbitMQ.AMQP.Client/ITopologyListener.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,20 @@ public interface ITopologyListener
66

77
void QueueDeleted(string name);
88

9+
void ExchangeDeclared(IExchangeSpecification specification);
10+
11+
void ExchangeDeleted(string name);
12+
13+
14+
void BindingDeclared(IBindingSpecification specification);
15+
16+
void BindingDeleted(string path);
17+
918
void Clear();
1019

1120
int QueueCount();
21+
22+
int ExchangeCount();
23+
24+
int BindingCount();
1225
}

RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ public AmqpBindingSpecification(AmqpManagement management)
3232
_management = management;
3333
}
3434

35+
public Dictionary<string, object> Arguments()
36+
{
37+
return _arguments;
38+
}
39+
40+
public string Path() => BindingsTarget();
41+
3542
public async Task BindAsync()
3643
{
3744
var kv = new Map
@@ -46,6 +53,7 @@ public async Task BindAsync()
4653
string method = AmqpManagement.Post;
4754
int[] expectedReturnCodes = [AmqpManagement.Code204];
4855
// Note: must use await so that ConfigureAwait(false) can be called
56+
_management.TopologyListener().BindingDeclared(this);
4957
await _management.RequestAsync(kv, path, method, expectedReturnCodes)
5058
.ConfigureAwait(false);
5159
}
@@ -63,12 +71,13 @@ public async Task UnbindAsync()
6371
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
6472
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";
6573

74+
_management.TopologyListener().BindingDeleted(Path());
6675
await _management.RequestAsync(null, path, method, expectedReturnCodes)
6776
.ConfigureAwait(false);
6877
}
6978
else
7079
{
71-
string bindingsPath = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
80+
string bindingsPath = BindingsTarget();
7281
List<Map> bindings = await GetBindings(bindingsPath).ConfigureAwait(false);
7382
string? path = MatchBinding(bindings, RoutingKey, ArgsToMap());
7483
if (path is null)
@@ -77,6 +86,7 @@ await _management.RequestAsync(null, path, method, expectedReturnCodes)
7786
}
7887
else
7988
{
89+
_management.TopologyListener().BindingDeclared(this);
8090
await _management.RequestAsync(null, path, method, expectedReturnCodes)
8191
.ConfigureAwait(false);
8292
}
@@ -95,6 +105,11 @@ public IBindingSpecification SourceExchange(string exchangeName)
95105
return this;
96106
}
97107

108+
public string SourceExchangeName()
109+
{
110+
return Source;
111+
}
112+
98113
public IBindingSpecification DestinationQueue(IQueueSpecification queueSpec)
99114
{
100115
return DestinationQueue(queueSpec.Name());
@@ -107,6 +122,8 @@ public IBindingSpecification DestinationQueue(string queueName)
107122
return this;
108123
}
109124

125+
public string DestinationQueueName() => Destination;
126+
110127
public IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec)
111128
{
112129
return DestinationExchange(exchangeSpec.Name());
@@ -118,12 +135,16 @@ public IBindingSpecification DestinationExchange(string exchangeName)
118135
return this;
119136
}
120137

138+
public string DestinationExchangeName() => Destination;
139+
121140
public IBindingSpecification Key(string key)
122141
{
123142
RoutingKey = key;
124143
return this;
125144
}
126145

146+
public string Key() => RoutingKey;
147+
127148
public IBindingSpecification Argument(string key, object value)
128149
{
129150
_arguments[key] = value;
@@ -136,17 +157,17 @@ public IBindingSpecification Arguments(Dictionary<string, object> arguments)
136157
return this;
137158
}
138159

139-
private string BindingsTarget(
140-
string destinationField, string source, string destination, string key)
160+
private string BindingsTarget()
141161
{
162+
string destinationField = ToQueue ? "dstq" : "dste";
142163
return "/bindings?src="
143-
+ Utils.EncodeHttpParameter(source)
164+
+ Utils.EncodeHttpParameter(Source)
144165
+ "&"
145166
+ destinationField
146167
+ "="
147-
+ Utils.EncodeHttpParameter(destination)
168+
+ Utils.EncodeHttpParameter(Destination)
148169
+ "&key="
149-
+ Utils.EncodeHttpParameter(key);
170+
+ Utils.EncodeHttpParameter(RoutingKey);
150171
}
151172

152173
private async Task<List<Map>> GetBindings(string path)

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,4 +547,44 @@ await Management.Queue(spec).DeclareAsync()
547547
}
548548
}
549549
}
550+
551+
552+
public async Task VisitExchangesAsync(IEnumerable<ExchangeSpec> exchangeSpec)
553+
{
554+
// TODO this could be done in parallel
555+
foreach (ExchangeSpec spec in exchangeSpec)
556+
{
557+
Trace.WriteLine(TraceLevel.Information, $"Recovering exchange {spec.Name}");
558+
try
559+
{
560+
await Management.Exchange(spec).DeclareAsync()
561+
.ConfigureAwait(false);
562+
}
563+
catch (Exception e)
564+
{
565+
Trace.WriteLine(TraceLevel.Error,
566+
$"Error recovering exchange {spec.Name}. Error: {e}. Management Status: {Management}");
567+
}
568+
}
569+
}
570+
571+
572+
public async Task VisitBindingsAsync(IEnumerable<BindingSpec> bindingSpec)
573+
{
574+
// TODO this could be done in parallel
575+
foreach (BindingSpec spec in bindingSpec)
576+
{
577+
Trace.WriteLine(TraceLevel.Information, $"Recovering binding {spec.Path}");
578+
try
579+
{
580+
await Management.Binding(spec).BindAsync()
581+
.ConfigureAwait(false);
582+
}
583+
catch (Exception e)
584+
{
585+
Trace.WriteLine(TraceLevel.Error,
586+
$"Error recovering binding {spec.Path}. Error: {e}. Management Status: {Management}");
587+
}
588+
}
589+
}
550590
}

RabbitMQ.AMQP.Client/Impl/AmqpExchangeSpecification.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public class AmqpExchangeSpecification(AmqpManagement management) : IExchangeSpe
77
private string _name = "";
88
private bool _autoDelete;
99
private ExchangeType _type = ExchangeType.DIRECT;
10-
private string _typeString = ""; // TODO: add this
1110
private readonly Map _arguments = new();
1211

1312
public Task DeclareAsync()
@@ -31,6 +30,7 @@ public Task DeclareAsync()
3130
string path = $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}";
3231
string method = AmqpManagement.Put;
3332
int[] expectedResponseCodes = [AmqpManagement.Code204, AmqpManagement.Code201, AmqpManagement.Code409];
33+
management.TopologyListener().ExchangeDeclared(this);
3434
return management.RequestAsync(kv, path, method, expectedResponseCodes);
3535
}
3636

@@ -39,7 +39,7 @@ public Task DeleteAsync()
3939
string path = $"/{Consts.Exchanges}/{Utils.EncodePathSegment(_name)}";
4040
string method = AmqpManagement.Delete;
4141
int[] expectedResponseCodes = [AmqpManagement.Code204];
42-
// TODO management topology listener?
42+
management.TopologyListener().ExchangeDeleted(_name);
4343
return management.RequestAsync(null, path, method, expectedResponseCodes);
4444
}
4545

@@ -60,21 +60,42 @@ public IExchangeSpecification AutoDelete(bool autoDelete)
6060
return this;
6161
}
6262

63+
public bool AutoDelete() => _autoDelete;
64+
6365
public IExchangeSpecification Type(ExchangeType type)
6466
{
6567
_type = type;
6668
return this;
6769
}
6870

69-
public IExchangeSpecification Type(string type)
71+
public ExchangeType Type() => _type;
72+
73+
74+
public IExchangeSpecification Argument(string key, object value)
7075
{
71-
_typeString = type;
76+
_arguments[key] = value;
7277
return this;
7378
}
7479

75-
public IExchangeSpecification Argument(string key, object value)
80+
public Dictionary<string, object> Arguments()
7681
{
77-
_arguments[key] = value;
82+
var result = new Dictionary<string, object>();
83+
84+
foreach ((object key, object value) in _arguments)
85+
{
86+
result[key.ToString() ?? throw new InvalidOperationException()] = value;
87+
}
88+
return result;
89+
}
90+
91+
92+
public IExchangeSpecification Arguments(Dictionary<string, object> arguments)
93+
{
94+
foreach ((object key, object value) in arguments)
95+
{
96+
_arguments[key] = value;
97+
}
98+
7899
return this;
79100
}
80101
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,28 @@ public IExchangeSpecification Exchange(string name)
9494
return Exchange().Name(name);
9595
}
9696

97+
public IExchangeSpecification Exchange(ExchangeSpec spec)
98+
{
99+
return Exchange().Name(spec.Name)
100+
.AutoDelete(spec.AutoDelete)
101+
.Type(spec.Type)
102+
.Arguments(spec.Arguments);
103+
}
104+
97105
public IBindingSpecification Binding()
98106
{
99107
return new AmqpBindingSpecification(this);
100108
}
109+
110+
public IBindingSpecification Binding(BindingSpec spec)
111+
{
112+
return Binding()
113+
.SourceExchange(spec.SourceExchange)
114+
.DestinationQueue(spec.DestinationQueue)
115+
.DestinationExchange(spec.DestinationExchange)
116+
.Key(spec.Key)
117+
.Arguments(spec.Arguments);
118+
}
101119

102120
public ITopologyListener TopologyListener()
103121
{

0 commit comments

Comments
 (0)