Skip to content

Commit ab35226

Browse files
authored
Major refactor (#43)
* Major refactor * Refactor test suites to use `IntegrationTest` base class * Remove IQueueDeletion and IExchangeDeletion, instead using the appropriate specification to do deletions * Increase Managment ReceiverLink credit to allow multiple management operations at the same time * Use `UrlEncode`, otherwise characters like double quotes will not be encoded correctly * fixup * fixup * * Start poking around at the reconnection code * * Make reconnection logic a bit easier to follow. * No need for a separate reconnection Task
1 parent c88a80b commit ab35226

31 files changed

+1617
-1138
lines changed

RabbitMQ.AMQP.Client/IAddressBuilder.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ public class InvalidAddressException(string message) : Exception(message);
44

55
public interface IAddressBuilder<out T>
66
{
7+
T Exchange(IExchangeSpecification exchangeSpec);
8+
T Exchange(string exchangeName);
79

8-
T Exchange(string exchange);
9-
10-
T Queue(string queue);
10+
T Queue(IQueueSpecification queueSpec);
11+
T Queue(string queueName);
1112

1213
T Key(string key);
1314
}

RabbitMQ.AMQP.Client/IConsumerBuilder.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ public enum StreamOffsetSpecification
77
Next
88
}
99

10+
// TODO IAddressBuilder<IConsumerBuilder>?
1011
public interface IConsumerBuilder
1112
{
12-
IConsumerBuilder Queue(string queue);
13+
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
14+
IConsumerBuilder Queue(string queueName);
1315

1416
IConsumerBuilder MessageHandler(MessageHandler handler);
1517

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ public interface IEntityInfo
55
}
66

77
/// <summary>
8-
/// Generic interface for declaring entities with result of type T
8+
/// Generic interface for managing entities with result of type T
99
/// </summary>
1010
/// <typeparam name="T"></typeparam>
11-
public interface IEntityInfoDeclaration<T> where T : IEntityInfo
11+
public interface IEntityInfoSpecification<T> where T : IEntityInfo
1212
{
13-
// TODO this really should be named DeclareAsync
14-
Task<T> Declare();
13+
Task<T> DeclareAsync();
14+
Task<T> DeleteAsync();
1515
}
1616

1717
/// <summary>
18-
/// Generic interface for declaring entities without result
18+
/// Generic interface for specifying entities without result
1919
/// </summary>
20-
public interface IEntityDeclaration
20+
public interface IEntitySpecification
2121
{
22-
// TODO this really should be named DeclareAsync
23-
Task Declare();
22+
Task DeclareAsync();
23+
Task DeleteAsync();
2424
}
2525

2626
public enum OverFlowStrategy
@@ -34,22 +34,22 @@ public enum OverFlowStrategy
3434
// REJECT_PUBLISH_DLX("reject-publish-dlx");
3535
}
3636

37-
public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
37+
public interface IQueueSpecification : IEntityInfoSpecification<IQueueInfo>
3838
{
39-
IQueueSpecification Name(string name);
4039
public string Name();
40+
IQueueSpecification Name(string name);
4141

42-
IQueueSpecification Exclusive(bool exclusive);
4342
public bool Exclusive();
43+
IQueueSpecification Exclusive(bool exclusive);
4444

45-
IQueueSpecification AutoDelete(bool autoDelete);
4645
public bool AutoDelete();
46+
IQueueSpecification AutoDelete(bool autoDelete);
4747

48-
IQueueSpecification Arguments(Dictionary<object, object> arguments);
4948
public Dictionary<object, object> Arguments();
49+
IQueueSpecification Arguments(Dictionary<object, object> arguments);
5050

51-
IQueueSpecification Type(QueueType type);
5251
public QueueType Type();
52+
IQueueSpecification Type(QueueType type);
5353

5454
IQueueSpecification DeadLetterExchange(string dlx);
5555

@@ -130,15 +130,9 @@ public interface IClassicQueueSpecification
130130
IQueueSpecification Queue();
131131
}
132132

133-
public interface IQueueDeletion
134-
{
135-
// TODO consider returning a QueueStatus object with some info after deletion
136-
// TODO should be named DeleteAsync and take CancellationToken
137-
Task<IEntityInfo> Delete(string name);
138-
}
139-
140-
public interface IExchangeSpecification : IEntityDeclaration
133+
public interface IExchangeSpecification : IEntitySpecification
141134
{
135+
string Name();
142136
IExchangeSpecification Name(string name);
143137

144138
IExchangeSpecification AutoDelete(bool autoDelete);
@@ -150,27 +144,23 @@ public interface IExchangeSpecification : IEntityDeclaration
150144
IExchangeSpecification Argument(string key, object value);
151145
}
152146

153-
public interface IExchangeDeletion
154-
{
155-
// TODO consider returning a ExchangeStatus object with some info after deletion
156-
// TODO should be named DeleteAsync and take CancellationToken
157-
Task Delete(string name);
158-
}
159-
160147
public interface IBindingSpecification
161148
{
162-
IBindingSpecification SourceExchange(string exchange);
149+
IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec);
150+
IBindingSpecification SourceExchange(string exchangeName);
163151

164-
IBindingSpecification DestinationQueue(string queue);
152+
IBindingSpecification DestinationQueue(IQueueSpecification queueSpec);
153+
IBindingSpecification DestinationQueue(string queueName);
165154

166-
IBindingSpecification DestinationExchange(string exchange);
155+
IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec);
156+
IBindingSpecification DestinationExchange(string exchangeName);
167157

168158
IBindingSpecification Key(string key);
169159

170160
IBindingSpecification Argument(string key, object value);
171161

172162
IBindingSpecification Arguments(Dictionary<string, object> arguments);
173163

174-
Task Bind();
175-
Task Unbind();
164+
Task BindAsync();
165+
Task UnbindAsync();
176166
}

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@ public interface IManagement : ILifeCycle
99
IQueueSpecification Queue();
1010
IQueueSpecification Queue(string name);
1111

12+
Task<IQueueInfo> GetQueueInfoAsync(IQueueSpecification queueSpec,
13+
CancellationToken cancellationToken = default);
1214
Task<IQueueInfo> GetQueueInfoAsync(string queueName,
1315
CancellationToken cancellationToken = default);
1416

15-
IQueueDeletion QueueDeletion();
16-
1717
IExchangeSpecification Exchange();
18-
1918
IExchangeSpecification Exchange(string name);
2019

21-
IExchangeDeletion ExchangeDeletion();
22-
2320
IBindingSpecification Binding();
2421

2522
ITopologyListener TopologyListener();

RabbitMQ.AMQP.Client/Impl/AmqpBindingSpecification.cs

Lines changed: 53 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@ protected Map ArgsToMap()
2323
}
2424
}
2525

26-
public class AmqpBindingSpecification(AmqpManagement management) : BindingSpecificationBase, IBindingSpecification
26+
public class AmqpBindingSpecification : BindingSpecificationBase, IBindingSpecification
2727
{
28-
private AmqpManagement Management { get; } = management;
28+
private readonly AmqpManagement _management;
2929

30-
public async Task Bind()
30+
public AmqpBindingSpecification(AmqpManagement management)
31+
{
32+
_management = management;
33+
}
34+
35+
public async Task BindAsync()
3136
{
3237
var kv = new Map
3338
{
@@ -37,58 +42,79 @@ public async Task Bind()
3742
{ ToQueue ? "destination_queue" : "destination_exchange", Destination }
3843
};
3944

40-
await Management.RequestAsync(kv, $"/{Consts.Bindings}",
41-
AmqpManagement.Post,
42-
[
43-
AmqpManagement.Code204,
44-
]).ConfigureAwait(false);
45+
string path = $"/{Consts.Bindings}";
46+
string method = AmqpManagement.Post;
47+
int[] expectedReturnCodes = [AmqpManagement.Code204];
48+
// Note: must use await so that ConfigureAwait(false) can be called
49+
await _management.RequestAsync(kv, path, method, expectedReturnCodes)
50+
.ConfigureAwait(false);
4551
}
4652

47-
public async Task Unbind()
53+
public async Task UnbindAsync()
4854
{
55+
string method = AmqpManagement.Delete;
4956
string destinationCharacter = ToQueue ? "dstq" : "dste";
57+
int[] expectedReturnCodes = [AmqpManagement.Code204];
58+
5059
if (_arguments.Count == 0)
5160
{
52-
string target =
61+
string path =
5362
$"/{Consts.Bindings}/src={Utils.EncodePathSegment(Source)};" +
5463
$"{($"{destinationCharacter}={Utils.EncodePathSegment(Destination)};" +
5564
$"key={Utils.EncodePathSegment(RoutingKey)};args=")}";
5665

57-
await Management.RequestAsync(
58-
null, target,
59-
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
66+
await _management.RequestAsync(null, path, method, expectedReturnCodes)
67+
.ConfigureAwait(false);
6068
}
6169
else
6270
{
63-
string path = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
64-
List<Map> bindings = await GetBindings(path).ConfigureAwait(false);
65-
string? uri = MatchBinding(bindings, RoutingKey, ArgsToMap());
66-
if (uri != null)
71+
string bindingsPath = BindingsTarget(destinationCharacter, Source, Destination, RoutingKey);
72+
List<Map> bindings = await GetBindings(bindingsPath).ConfigureAwait(false);
73+
string? path = MatchBinding(bindings, RoutingKey, ArgsToMap());
74+
if (path is null)
6775
{
68-
await Management.RequestAsync(
69-
null, uri,
70-
AmqpManagement.Delete, new[] { AmqpManagement.Code204 }).ConfigureAwait(false);
76+
// TODO is this an error?
77+
}
78+
else
79+
{
80+
await _management.RequestAsync(null, path, method, expectedReturnCodes)
81+
.ConfigureAwait(false);
7182
}
7283
}
7384
}
7485

75-
public IBindingSpecification SourceExchange(string exchange)
86+
public IBindingSpecification SourceExchange(IExchangeSpecification exchangeSpec)
87+
{
88+
return SourceExchange(exchangeSpec.Name());
89+
}
90+
91+
public IBindingSpecification SourceExchange(string exchangeName)
7692
{
7793
ToQueue = false;
78-
Source = exchange;
94+
Source = exchangeName;
7995
return this;
8096
}
8197

82-
public IBindingSpecification DestinationQueue(string queue)
98+
public IBindingSpecification DestinationQueue(IQueueSpecification queueSpec)
99+
{
100+
return DestinationQueue(queueSpec.Name());
101+
}
102+
103+
public IBindingSpecification DestinationQueue(string queueName)
83104
{
84105
ToQueue = true;
85-
Destination = queue;
106+
Destination = queueName;
86107
return this;
87108
}
88109

89-
public IBindingSpecification DestinationExchange(string exchange)
110+
public IBindingSpecification DestinationExchange(IExchangeSpecification exchangeSpec)
111+
{
112+
return DestinationExchange(exchangeSpec.Name());
113+
}
114+
115+
public IBindingSpecification DestinationExchange(string exchangeName)
90116
{
91-
Destination = exchange;
117+
Destination = exchangeName;
92118
return this;
93119
}
94120

@@ -125,7 +151,7 @@ private string BindingsTarget(
125151

126152
private async Task<List<Map>> GetBindings(string path)
127153
{
128-
Amqp.Message result = await Management.RequestAsync(
154+
Amqp.Message result = await _management.RequestAsync(
129155
null, path,
130156
AmqpManagement.Get, new[] { AmqpManagement.Code200 }).ConfigureAwait(false);
131157

0 commit comments

Comments
 (0)