Skip to content

Commit 25874e2

Browse files
committed
Use EasyNetQ.Management.Client
* Allow specifying `MaxFrameSize` for a connection. * Move HTTP API interaction to `EasyNetQ.Management.Client` * No need to start toxiproxy yet * Use `uint.MinValue` to mean `unlimited` for max frame size * Implement the Environment class to manage the connections (#36) * Implement the Environment Closes #35 --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Use `EasyNetQ.Management.Client` * Allow specifying `MaxFrameSize` for a connection. * Move HTTP API interaction to `EasyNetQ.Management.Client` * No need to start toxiproxy yet * Use `uint.MinValue` to mean `unlimited` for max frame size * * Add to public API
1 parent 64564b2 commit 25874e2

File tree

8 files changed

+83
-80
lines changed

8 files changed

+83
-80
lines changed

.github/workflows/build-test.yaml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ jobs:
2626
run: dotnet format ${{ github.workspace }}/Build.csproj --no-restore --verify-no-changes
2727
- name: Start RabbitMQ
2828
id: start-rabbitmq
29-
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
29+
# Note: not using toxiproxy yet
30+
# run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
31+
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
3032
- name: Test
3133
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true
3234
- name: Check for errors in RabbitMQ logs
3335
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh
34-
- name: Maybe collect toxiproxy logs
35-
if: failure()
36-
run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
36+
# Note: not using toxiproxy yet
37+
# - name: Maybe collect toxiproxy logs
38+
# if: failure()
39+
# run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
3740
- name: Maybe upload RabbitMQ logs
3841
if: failure()
3942
uses: actions/upload-artifact@v4

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1515
string ConnectionName { get; }
1616
string Path { get; }
1717
bool UseSsl { get; }
18+
uint MaxFrameSize { get; }
1819
SaslMechanism SaslMechanism { get; }
1920
ITlsSettings? TlsSettings { get; }
2021

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private void ChangeEntitiesStatus(State state, Error? error)
5555

5656
private void ChangePublishersStatus(State state, Error? error)
5757
{
58-
foreach (var publisher1 in Publishers.Values)
58+
foreach (IPublisher publisher1 in Publishers.Values)
5959
{
6060
var publisher = (AmqpPublisher)publisher1;
6161
publisher.ChangeStatus(state, error);
@@ -64,7 +64,7 @@ private void ChangePublishersStatus(State state, Error? error)
6464

6565
private void ChangeConsumersStatus(State state, Error? error)
6666
{
67-
foreach (var consumer1 in Consumers.Values)
67+
foreach (IConsumer consumer1 in Consumers.Values)
6868
{
6969
var consumer = (AmqpConsumer)consumer1;
7070
consumer.ChangeStatus(state, error);
@@ -79,7 +79,7 @@ private async Task ReconnectEntities()
7979

8080
private async Task ReconnectPublishers()
8181
{
82-
foreach (var publisher1 in Publishers.Values)
82+
foreach (IPublisher publisher1 in Publishers.Values)
8383
{
8484
var publisher = (AmqpPublisher)publisher1;
8585
await publisher.Reconnect().ConfigureAwait(false);
@@ -88,7 +88,7 @@ private async Task ReconnectPublishers()
8888

8989
private async Task ReconnectConsumers()
9090
{
91-
foreach (var consumer1 in Consumers.Values)
91+
foreach (IConsumer consumer1 in Consumers.Values)
9292
{
9393
var consumer = (AmqpConsumer)consumer1;
9494
await consumer.Reconnect().ConfigureAwait(false);
@@ -205,9 +205,18 @@ private async Task EnsureConnection()
205205
var open = new Open
206206
{
207207
HostName = $"vhost:{_connectionSettings.VirtualHost}",
208-
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
208+
Properties = new Fields()
209+
{
210+
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
211+
}
209212
};
210213

214+
if (_connectionSettings.MaxFrameSize > uint.MinValue)
215+
{
216+
// Note: when set here, there is no need to set cf.AMQP.MaxFrameSize
217+
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
218+
}
219+
211220
void onOpened(Amqp.IConnection connection, Open open1)
212221
{
213222
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class ConnectionSettingBuilder
1515
private string _scheme = "AMQP";
1616
private string _connectionName = "AMQP.NET";
1717
private string _virtualHost = "/";
18+
private uint _maxFrameSize = Consts.DefaultMaxFrameSize;
1819
private SaslMechanism _saslMechanism = Client.SaslMechanism.Plain;
1920
private IRecoveryConfiguration _recoveryConfiguration = Impl.RecoveryConfiguration.Create();
2021

@@ -69,6 +70,17 @@ public ConnectionSettingBuilder VirtualHost(string virtualHost)
6970
return this;
7071
}
7172

73+
public ConnectionSettingBuilder MaxFrameSize(uint maxFrameSize)
74+
{
75+
_maxFrameSize = maxFrameSize;
76+
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
77+
{
78+
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
79+
"maxFrameSize must be greater or equal to 512");
80+
}
81+
return this;
82+
}
83+
7284
public ConnectionSettingBuilder SaslMechanism(SaslMechanism saslMechanism)
7385
{
7486
_saslMechanism = saslMechanism;
@@ -89,9 +101,9 @@ public ConnectionSettingBuilder RecoveryConfiguration(IRecoveryConfiguration rec
89101

90102
public ConnectionSettings Build()
91103
{
92-
var c = new ConnectionSettings(_host, _port, _user,
104+
var c = new ConnectionSettings(_scheme, _host, _port, _user,
93105
_password, _virtualHost,
94-
_scheme, _connectionName, _saslMechanism)
106+
_connectionName, _saslMechanism, _maxFrameSize)
95107
{
96108
Recovery = (RecoveryConfiguration)_recoveryConfiguration
97109
};
@@ -106,8 +118,9 @@ public ConnectionSettings Build()
106118
public class ConnectionSettings : IConnectionSettings
107119
{
108120
private readonly Address _address;
109-
private readonly string _connectionName = "";
110121
private readonly string _virtualHost = "/";
122+
private readonly string _connectionName = "";
123+
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
111124
private readonly ITlsSettings? _tlsSettings;
112125
private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;
113126

@@ -122,17 +135,27 @@ public ConnectionSettings(string address, ITlsSettings? tlsSettings = null)
122135
}
123136
}
124137

125-
public ConnectionSettings(string host, int port,
138+
public ConnectionSettings(string scheme, string host, int port,
126139
string? user, string? password,
127-
string virtualHost, string scheme, string connectionName,
128-
SaslMechanism saslMechanism, ITlsSettings? tlsSettings = null)
140+
string virtualHost, string connectionName,
141+
SaslMechanism saslMechanism,
142+
uint maxFrameSize = Consts.DefaultMaxFrameSize,
143+
ITlsSettings? tlsSettings = null)
129144
{
130145
_address = new Address(host: host, port: port,
131146
user: user, password: password,
132147
path: "/", scheme: scheme);
133148
_connectionName = connectionName;
134149
_virtualHost = virtualHost;
135150
_saslMechanism = saslMechanism;
151+
152+
_maxFrameSize = maxFrameSize;
153+
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
154+
{
155+
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
156+
"maxFrameSize must be greater or equal to 512");
157+
}
158+
136159
_tlsSettings = tlsSettings;
137160

138161
if (_address.UseSsl && _tlsSettings == null)
@@ -150,8 +173,8 @@ public ConnectionSettings(string host, int port,
150173
public string ConnectionName => _connectionName;
151174
public string Path => _address.Path;
152175
public bool UseSsl => _address.UseSsl;
176+
public uint MaxFrameSize => _maxFrameSize;
153177
public SaslMechanism SaslMechanism => _saslMechanism;
154-
155178
public ITlsSettings? TlsSettings => _tlsSettings;
156179
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();
157180

RabbitMQ.AMQP.Client/Impl/Consts.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@ public class Consts
66
public const string Key = "key";
77
public const string Queues = "queues";
88
public const string Bindings = "bindings";
9-
9+
// public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024
10+
// TODO: change to uint.MinValue when https://github.com/rabbitmq/rabbitmq-server/pull/11843 is merged
11+
public const uint DefaultMaxFrameSize = 256 * 1024;
1012
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#nullable enable
22
abstract RabbitMQ.AMQP.Client.Impl.AbstractLifeCycle.CloseAsync() -> System.Threading.Tasks.Task!
33
const RabbitMQ.AMQP.Client.Impl.Consts.Bindings = "bindings" -> string!
4+
const RabbitMQ.AMQP.Client.Impl.Consts.DefaultMaxFrameSize = 262144 -> uint
45
const RabbitMQ.AMQP.Client.Impl.Consts.Exchanges = "exchanges" -> string!
56
const RabbitMQ.AMQP.Client.Impl.Consts.Key = "key" -> string!
67
const RabbitMQ.AMQP.Client.Impl.Consts.Queues = "queues" -> string!
@@ -80,6 +81,7 @@ RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPub
8081
RabbitMQ.AMQP.Client.IConnectionSettings
8182
RabbitMQ.AMQP.Client.IConnectionSettings.ConnectionName.get -> string!
8283
RabbitMQ.AMQP.Client.IConnectionSettings.Host.get -> string!
84+
RabbitMQ.AMQP.Client.IConnectionSettings.MaxFrameSize.get -> uint
8385
RabbitMQ.AMQP.Client.IConnectionSettings.Password.get -> string?
8486
RabbitMQ.AMQP.Client.IConnectionSettings.Path.get -> string!
8587
RabbitMQ.AMQP.Client.IConnectionSettings.Port.get -> int
@@ -328,6 +330,7 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder
328330
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Build() -> RabbitMQ.AMQP.Client.Impl.ConnectionSettings!
329331
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.ConnectionName(string! connectionName) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
330332
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Host(string! host) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
333+
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.MaxFrameSize(uint maxFrameSize) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
331334
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Password(string! password) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
332335
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.Port(int port) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
333336
RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.RecoveryConfiguration(RabbitMQ.AMQP.Client.IRecoveryConfiguration! recoveryConfiguration) -> RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder!
@@ -338,10 +341,11 @@ RabbitMQ.AMQP.Client.Impl.ConnectionSettingBuilder.VirtualHost(string! virtualHo
338341
RabbitMQ.AMQP.Client.Impl.ConnectionSettings
339342
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionName.get -> string!
340343
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! address, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
341-
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! host, int port, string? user, string? password, string! virtualHost, string! scheme, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
344+
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.ConnectionSettings(string! scheme, string! host, int port, string? user, string? password, string! virtualHost, string! connectionName, RabbitMQ.AMQP.Client.SaslMechanism! saslMechanism, uint maxFrameSize = 262144, RabbitMQ.AMQP.Client.ITlsSettings? tlsSettings = null) -> void
342345
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.IConnectionSettings? other) -> bool
343346
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Equals(RabbitMQ.AMQP.Client.Impl.ConnectionSettings! other) -> bool
344347
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Host.get -> string!
348+
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.MaxFrameSize.get -> uint
345349
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Password.get -> string?
346350
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Path.get -> string!
347351
RabbitMQ.AMQP.Client.Impl.ConnectionSettings.Port.get -> int

Tests/ConnectionTests.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ public class ConnectionTests(ITestOutputHelper output)
1818
[Fact]
1919
public void ValidateAddress()
2020
{
21-
ConnectionSettings connectionSettings = new("localhost", 5672, "guest-user",
22-
"guest-password", "vhost_1", "amqp1", "connection_name", SaslMechanism.External);
21+
ConnectionSettings connectionSettings = new("amqp1", "localhost", 5672, "guest-user",
22+
"guest-password", "vhost_1", "connection_name", SaslMechanism.External);
2323
Assert.Equal("localhost", connectionSettings.Host);
2424
Assert.Equal(5672, connectionSettings.Port);
2525
Assert.Equal("guest-user", connectionSettings.User);
@@ -28,13 +28,13 @@ public void ValidateAddress()
2828
Assert.Equal("amqp1", connectionSettings.Scheme);
2929
Assert.Equal(SaslMechanism.External, connectionSettings.SaslMechanism);
3030

31-
ConnectionSettings second = new("localhost", 5672, "guest-user",
32-
"guest-password", "path/", "amqp1", "connection_name", SaslMechanism.External);
31+
ConnectionSettings second = new("amqp1", "localhost", 5672, "guest-user",
32+
"guest-password", "path/", "connection_name", SaslMechanism.External);
3333

3434
Assert.Equal(connectionSettings, second);
3535

36-
ConnectionSettings third = new("localhost", 5672, "guest-user",
37-
"guest-password", "path/", "amqp2", "connection_name", SaslMechanism.Plain);
36+
ConnectionSettings third = new("amqp2", "localhost", 5672, "guest-user",
37+
"guest-password", "path/", "connection_name", SaslMechanism.Plain);
3838

3939
Assert.NotEqual(connectionSettings, third);
4040
}

Tests/Utils.cs

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,42 +13,11 @@
1313
using System.Text.Json;
1414
using System.Threading;
1515
using System.Threading.Tasks;
16-
using Xunit;
17-
using Xunit.Abstractions;
16+
using EasyNetQ.Management.Client;
1817
using Xunit.Sdk;
1918

2019
namespace Tests
2120
{
22-
public class Utils<TResult>(ITestOutputHelper testOutputHelper)
23-
{
24-
public void WaitUntilTaskCompletes(TaskCompletionSource<TResult> tasks)
25-
{
26-
WaitUntilTaskCompletes(tasks, true, TimeSpan.FromSeconds(10));
27-
}
28-
29-
public void WaitUntilTaskCompletes(TaskCompletionSource<TResult> tasks,
30-
bool expectToComplete = true)
31-
{
32-
WaitUntilTaskCompletes(tasks, expectToComplete, TimeSpan.FromSeconds(10));
33-
}
34-
35-
public void WaitUntilTaskCompletes(TaskCompletionSource<TResult> tasks,
36-
bool expectToComplete,
37-
TimeSpan timeOut)
38-
{
39-
try
40-
{
41-
var resultTestWait = tasks.Task.Wait(timeOut);
42-
Assert.Equal(resultTestWait, expectToComplete);
43-
}
44-
catch (Exception e)
45-
{
46-
testOutputHelper.WriteLine($"wait until task completes error #{e}");
47-
throw;
48-
}
49-
}
50-
}
51-
5221
public static class SystemUtils
5322
{
5423
// Waits for 10 seconds total by default
@@ -89,7 +58,6 @@ public static void Wait(TimeSpan wait)
8958
Thread.Sleep(wait);
9059
}
9160

92-
9361
private class Connection
9462
{
9563
public string? name { get; set; }
@@ -139,34 +107,27 @@ public static async Task<int> ConnectionsCountByName(string connectionName)
139107

140108
public static async Task<bool> IsConnectionOpen(string connectionName)
141109
{
142-
using HttpClientHandler handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), };
143-
using HttpClient client = new HttpClient(handler);
110+
var managementUri = new Uri("http://localhost:15672");
111+
using var managementClient = new ManagementClient(managementUri, "guest", "guest");
144112

145-
HttpResponseMessage result = await client.GetAsync("http://localhost:15672/api/connections");
146-
if (!result.IsSuccessStatusCode)
147-
{
148-
throw new XunitException($"HTTP GET failed: {result.StatusCode} {result.ReasonPhrase}");
149-
}
113+
IReadOnlyList<EasyNetQ.Management.Client.Model.Connection> connections = await managementClient.GetConnectionsAsync();
150114

151-
Stream resultContentStream = await result.Content.ReadAsStreamAsync();
152-
object? obj = await JsonSerializer.DeserializeAsync(resultContentStream, typeof(IEnumerable<Connection>));
153-
return obj switch
154-
{
155-
null => false,
156-
IEnumerable<Connection> connections =>
157-
connections.Any(x =>
115+
return connections.Any(conn =>
158116
{
159-
if (x.client_properties is null)
117+
if (conn.ClientProperties is not null)
160118
{
161-
return false;
119+
if (conn.ClientProperties.TryGetValue("connection_name", out object? connectionNameObj))
120+
{
121+
if (connectionNameObj is not null)
122+
{
123+
string connName = (string)connectionNameObj;
124+
return connName.Contains(connectionName);
125+
}
126+
}
162127
}
163-
else
164-
{
165-
return x.client_properties["connection_name"].Contains(connectionName);
166-
}
167-
}),
168-
_ => false
169-
};
128+
129+
return false;
130+
});
170131
}
171132

172133
public static async Task<int> HttpKillConnections(string connectionName)

0 commit comments

Comments
 (0)