Skip to content

Commit 352aa7c

Browse files
authored
Merge branch 'main' into misc
2 parents d7e4063 + 4be8beb commit 352aa7c

24 files changed

+813
-542
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ else
4646
readonly docker_pull_args=''
4747
fi
4848

49-
set -o nounset
50-
51-
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
52-
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"
53-
5449
if [[ $1 == 'stop' ]]
5550
then
5651
docker stop "$rabbitmq_docker_name"
5752
docker stop "$toxiproxy_docker_name"
5853
exit 0
5954
fi
6055

56+
set -o nounset
57+
58+
declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
59+
declare -r toxiproxy_docker_name="$docker_name_prefix-toxiproxy"
60+
6161
function start_toxiproxy
6262
{
6363
if [[ $run_toxiproxy == 'true' ]]

.github/workflows/build-test.yaml

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ jobs:
2626
run: dotnet format ${{ github.workspace }}/Build.csproj --no-restore --verify-no-changes
2727
- name: Start RabbitMQ
2828
id: start-rabbitmq
29-
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
29+
# Note: not using toxiproxy yet
30+
# run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh toxiproxy
31+
run: ${{ github.workspace }}/.ci/ubuntu/gha-setup.sh
3032
- name: Test
31-
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true
33+
run: dotnet test ${{ github.workspace }}/Build.csproj --no-restore --no-build --logger "console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=${{github.workspace}}/rabbit.snk
3234
- name: Check for errors in RabbitMQ logs
3335
run: ${{ github.workspace}}/.ci/ubuntu/gha-log-check.sh
34-
- name: Maybe collect toxiproxy logs
35-
if: failure()
36-
run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
36+
# Note: not using toxiproxy yet
37+
# - name: Maybe collect toxiproxy logs
38+
# if: failure()
39+
# run: docker logs rabbitmq-amqp-dotnet-client-toxiproxy > ${{ github.workspace }}/.ci/ubuntu/log/toxiproxy.log
3740
- name: Maybe upload RabbitMQ logs
3841
if: failure()
3942
uses: actions/upload-artifact@v4

Directory.Build.targets

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
<!-- vim: set ft=xml: -->
22
<Project>
3+
<PropertyGroup>
4+
<PublicKey>00240000048000009400000006020000002400005253413100040000010001005d8c30a2f8706bf952e03a51ced8b44b6e13c8a643db9ef030032b25431ec6e67b73c570beae14d63e6bd36f248c51800525e33a3fd7749bb877cfac0ad0b6544c9b385c007efb1b283327e6284396807be6300c13bbd14fed3d4c878dc33d1d832303daa9799a7f90e79aa2e5ded80143361e4849879b19a3886544780bb3bc</PublicKey>
5+
<TargetFrameworkMonikerAssemblyAttributesPath>$([System.IO.Path]::Combine('$(IntermediateOutputPath)','$(TargetFrameworkMoniker).AssemblyAttributes$(DefaultLanguageSourceExtension)'))</TargetFrameworkMonikerAssemblyAttributesPath>
6+
</PropertyGroup>
37
<ItemGroup>
48
<EmbeddedFiles Include="$(GeneratedAssemblyInfoFile)"/>
59
</ItemGroup>

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ build:
77
dotnet build $(CURDIR)/Build.csproj
88

99
test: build
10-
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true
10+
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true /p:AltCoverStrongNameKey=$(CURDIR)/rabbit.snk
1111

1212
rabbitmq-server-start-arm:
1313
./.ci/ubuntu/gha-setup.sh start pull arm

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1515
string ConnectionName { get; }
1616
string Path { get; }
1717
bool UseSsl { get; }
18+
uint MaxFrameSize { get; }
1819
SaslMechanism SaslMechanism { get; }
1920
ITlsSettings? TlsSettings { get; }
2021

@@ -35,20 +36,25 @@ public interface ITlsSettings
3536
/// <summary>
3637
/// Supported protocols to use.
3738
/// </summary>
38-
SslProtocols Protocols { get; }
39+
SslProtocols Protocols { get; set; }
40+
41+
/// <summary>
42+
/// Acceptable TLS/SSL errors when connecting.
43+
/// </summary>
44+
SslPolicyErrors AcceptablePolicyErrors { get; set; }
3945

4046
/// <summary>
4147
/// Specifies whether certificate revocation should be performed during handshake.
4248
/// </summary>
43-
bool CheckCertificateRevocation { get; }
49+
bool CheckCertificateRevocation { get; set; }
4450

4551
/// <summary>
4652
/// Gets or sets a certificate validation callback to validate remote certificate.
4753
/// </summary>
48-
RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; }
54+
RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; }
4955

5056
/// <summary>
5157
/// Gets or sets a local certificate selection callback to select the certificate which should be used for authentication.
5258
/// </summary>
53-
LocalCertificateSelectionCallback? LocalCertificateSelectionCallback { get; }
59+
LocalCertificateSelectionCallback? LocalCertificateSelectionCallback { get; set; }
5460
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,6 @@
77

88
namespace RabbitMQ.AMQP.Client.Impl;
99

10-
internal class Visitor(AmqpManagement management) : IVisitor
11-
{
12-
private AmqpManagement Management { get; } = management;
13-
14-
public async Task VisitQueues(IEnumerable<QueueSpec> queueSpec)
15-
{
16-
foreach (QueueSpec spec in queueSpec)
17-
{
18-
Trace.WriteLine(TraceLevel.Information, $"Recovering queue {spec.Name}");
19-
try
20-
{
21-
await Management.Queue(spec).Declare()
22-
.ConfigureAwait(false);
23-
}
24-
catch (Exception e)
25-
{
26-
Trace.WriteLine(TraceLevel.Error,
27-
$"Error recovering queue {spec.Name}. Error: {e}. Management Status: {Management}");
28-
}
29-
}
30-
}
31-
}
32-
3310
/// <summary>
3411
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
3512
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
@@ -38,7 +15,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3815
{
3916
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
4017
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
41-
private readonly SemaphoreSlim _semaphoreClose = new(1, 1);
18+
private readonly SemaphoreSlim _semaphoreClose = new(1, 1); // TODO this needs to be Disposed
4219

4320
// The native AMQP.Net Lite connection
4421
private Connection? _nativeConnection;
@@ -55,7 +32,7 @@ private void ChangeEntitiesStatus(State state, Error? error)
5532

5633
private void ChangePublishersStatus(State state, Error? error)
5734
{
58-
foreach (var publisher1 in Publishers.Values)
35+
foreach (IPublisher publisher1 in Publishers.Values)
5936
{
6037
var publisher = (AmqpPublisher)publisher1;
6138
publisher.ChangeStatus(state, error);
@@ -64,7 +41,7 @@ private void ChangePublishersStatus(State state, Error? error)
6441

6542
private void ChangeConsumersStatus(State state, Error? error)
6643
{
67-
foreach (var consumer1 in Consumers.Values)
44+
foreach (IConsumer consumer1 in Consumers.Values)
6845
{
6946
var consumer = (AmqpConsumer)consumer1;
7047
consumer.ChangeStatus(state, error);
@@ -79,7 +56,7 @@ private async Task ReconnectEntities()
7956

8057
private async Task ReconnectPublishers()
8158
{
82-
foreach (var publisher1 in Publishers.Values)
59+
foreach (IPublisher publisher1 in Publishers.Values)
8360
{
8461
var publisher = (AmqpPublisher)publisher1;
8562
await publisher.Reconnect().ConfigureAwait(false);
@@ -88,7 +65,7 @@ private async Task ReconnectPublishers()
8865

8966
private async Task ReconnectConsumers()
9067
{
91-
foreach (var consumer1 in Consumers.Values)
68+
foreach (IConsumer consumer1 in Consumers.Values)
9269
{
9370
var consumer = (AmqpConsumer)consumer1;
9471
await consumer.Reconnect().ConfigureAwait(false);
@@ -205,10 +182,19 @@ private async Task EnsureConnection()
205182
var open = new Open
206183
{
207184
HostName = $"vhost:{_connectionSettings.VirtualHost}",
208-
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
185+
Properties = new Fields()
186+
{
187+
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
188+
}
209189
};
210190

211-
void OnOpened(Amqp.IConnection connection, Open open1)
191+
if (_connectionSettings.MaxFrameSize > uint.MinValue)
192+
{
193+
// Note: when set here, there is no need to set cf.AMQP.MaxFrameSize
194+
open.MaxFrameSize = _connectionSettings.MaxFrameSize;
195+
}
196+
197+
void onOpened(Amqp.IConnection connection, Open open1)
212198
{
213199
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is open");
214200
OnNewStatus(State.Open, null);
@@ -418,7 +404,6 @@ public IPublisherBuilder PublisherBuilder()
418404
return publisherBuilder;
419405
}
420406

421-
422407
public override async Task CloseAsync()
423408
{
424409
await _semaphoreClose.WaitAsync()
@@ -458,10 +443,32 @@ await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10
458443
OnNewStatus(State.Closed, null);
459444
}
460445

461-
462446
public override string ToString()
463447
{
464448
string info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
465449
return info;
466450
}
467451
}
452+
453+
internal class Visitor(AmqpManagement management) : IVisitor
454+
{
455+
private AmqpManagement Management { get; } = management;
456+
457+
public async Task VisitQueues(IEnumerable<QueueSpec> queueSpec)
458+
{
459+
foreach (QueueSpec spec in queueSpec)
460+
{
461+
Trace.WriteLine(TraceLevel.Information, $"Recovering queue {spec.Name}");
462+
try
463+
{
464+
await Management.Queue(spec).Declare()
465+
.ConfigureAwait(false);
466+
}
467+
catch (Exception e)
468+
{
469+
Trace.WriteLine(TraceLevel.Error,
470+
$"Error recovering queue {spec.Name}. Error: {e}. Management Status: {Management}");
471+
}
472+
}
473+
}
474+
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class ConnectionSettingBuilder
1515
private string _scheme = "AMQP";
1616
private string _connectionName = "AMQP.NET";
1717
private string _virtualHost = "/";
18+
private uint _maxFrameSize = Consts.DefaultMaxFrameSize;
1819
private SaslMechanism _saslMechanism = Client.SaslMechanism.Plain;
1920
private IRecoveryConfiguration _recoveryConfiguration = Impl.RecoveryConfiguration.Create();
2021

@@ -69,6 +70,17 @@ public ConnectionSettingBuilder VirtualHost(string virtualHost)
6970
return this;
7071
}
7172

73+
public ConnectionSettingBuilder MaxFrameSize(uint maxFrameSize)
74+
{
75+
_maxFrameSize = maxFrameSize;
76+
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
77+
{
78+
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
79+
"maxFrameSize must be greater or equal to 512");
80+
}
81+
return this;
82+
}
83+
7284
public ConnectionSettingBuilder SaslMechanism(SaslMechanism saslMechanism)
7385
{
7486
_saslMechanism = saslMechanism;
@@ -89,9 +101,9 @@ public ConnectionSettingBuilder RecoveryConfiguration(IRecoveryConfiguration rec
89101

90102
public ConnectionSettings Build()
91103
{
92-
var c = new ConnectionSettings(_host, _port, _user,
104+
var c = new ConnectionSettings(_scheme, _host, _port, _user,
93105
_password, _virtualHost,
94-
_scheme, _connectionName, _saslMechanism)
106+
_connectionName, _saslMechanism, _maxFrameSize)
95107
{
96108
Recovery = (RecoveryConfiguration)_recoveryConfiguration
97109
};
@@ -106,8 +118,9 @@ public ConnectionSettings Build()
106118
public class ConnectionSettings : IConnectionSettings
107119
{
108120
private readonly Address _address;
109-
private readonly string _connectionName = "";
110121
private readonly string _virtualHost = "/";
122+
private readonly string _connectionName = "";
123+
private readonly uint _maxFrameSize = Consts.DefaultMaxFrameSize;
111124
private readonly ITlsSettings? _tlsSettings;
112125
private readonly SaslMechanism _saslMechanism = SaslMechanism.Plain;
113126

@@ -122,17 +135,27 @@ public ConnectionSettings(string address, ITlsSettings? tlsSettings = null)
122135
}
123136
}
124137

125-
public ConnectionSettings(string host, int port,
138+
public ConnectionSettings(string scheme, string host, int port,
126139
string? user, string? password,
127-
string virtualHost, string scheme, string connectionName,
128-
SaslMechanism saslMechanism, ITlsSettings? tlsSettings = null)
140+
string virtualHost, string connectionName,
141+
SaslMechanism saslMechanism,
142+
uint maxFrameSize = Consts.DefaultMaxFrameSize,
143+
ITlsSettings? tlsSettings = null)
129144
{
130145
_address = new Address(host: host, port: port,
131146
user: user, password: password,
132147
path: "/", scheme: scheme);
133148
_connectionName = connectionName;
134149
_virtualHost = virtualHost;
135150
_saslMechanism = saslMechanism;
151+
152+
_maxFrameSize = maxFrameSize;
153+
if (_maxFrameSize != uint.MinValue && _maxFrameSize < 512)
154+
{
155+
throw new ArgumentOutOfRangeException(nameof(maxFrameSize),
156+
"maxFrameSize must be greater or equal to 512");
157+
}
158+
136159
_tlsSettings = tlsSettings;
137160

138161
if (_address.UseSsl && _tlsSettings == null)
@@ -150,8 +173,8 @@ public ConnectionSettings(string host, int port,
150173
public string ConnectionName => _connectionName;
151174
public string Path => _address.Path;
152175
public bool UseSsl => _address.UseSsl;
176+
public uint MaxFrameSize => _maxFrameSize;
153177
public SaslMechanism SaslMechanism => _saslMechanism;
154-
155178
public ITlsSettings? TlsSettings => _tlsSettings;
156179
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();
157180

@@ -358,40 +381,34 @@ public override string ToString()
358381
public class TlsSettings : ITlsSettings
359382
{
360383
internal const SslProtocols DefaultSslProtocols = SslProtocols.None;
361-
362-
private readonly SslProtocols _protocols;
363-
private readonly X509CertificateCollection _clientCertificates;
364-
private readonly bool _checkCertificateRevocation = false;
365-
private readonly RemoteCertificateValidationCallback? _remoteCertificateValidationCallback;
366-
private readonly LocalCertificateSelectionCallback? _localCertificateSelectionCallback;
384+
private readonly X509CertificateCollection _clientCertificates = new X509CertificateCollection();
367385

368386
public TlsSettings() : this(DefaultSslProtocols)
369387
{
370388
}
371389

372390
public TlsSettings(SslProtocols protocols)
373391
{
374-
_protocols = protocols;
375-
_clientCertificates = new X509CertificateCollection();
376-
_remoteCertificateValidationCallback = trustEverythingCertValidationCallback;
377-
_localCertificateSelectionCallback = null;
392+
Protocols = protocols;
393+
RemoteCertificateValidationCallback = trustEverythingCertValidationCallback;
394+
LocalCertificateSelectionCallback = null;
378395
}
379396

380-
public SslProtocols Protocols => _protocols;
397+
public SslProtocols Protocols { get; set; }
398+
399+
public SslPolicyErrors AcceptablePolicyErrors { get; set; } = SslPolicyErrors.None;
381400

382401
public X509CertificateCollection ClientCertificates => _clientCertificates;
383402

384-
public bool CheckCertificateRevocation => _checkCertificateRevocation;
403+
public bool CheckCertificateRevocation { get; set; } = false;
385404

386-
public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback
387-
=> _remoteCertificateValidationCallback;
405+
public RemoteCertificateValidationCallback? RemoteCertificateValidationCallback { get; set; }
388406

389-
public LocalCertificateSelectionCallback? LocalCertificateSelectionCallback
390-
=> _localCertificateSelectionCallback;
407+
public LocalCertificateSelectionCallback? LocalCertificateSelectionCallback { get; set; }
391408

392-
private static bool trustEverythingCertValidationCallback(object sender, X509Certificate? certificate,
409+
private bool trustEverythingCertValidationCallback(object sender, X509Certificate? certificate,
393410
X509Chain? chain, SslPolicyErrors sslPolicyErrors)
394411
{
395-
return true;
412+
return (sslPolicyErrors & ~AcceptablePolicyErrors) == SslPolicyErrors.None;
396413
}
397414
}

RabbitMQ.AMQP.Client/Impl/Consts.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ public class Consts
77
public const string Queues = "queues";
88
public const string Bindings = "bindings";
99

10+
/// <summary>
11+
/// <code>uint.MinValue</code> means "no limit"
12+
/// </summary>
13+
public const uint DefaultMaxFrameSize = uint.MinValue; // NOTE: Azure/amqpnetlite uses 256 * 1024
1014
}

0 commit comments

Comments
 (0)