Skip to content

feat: introduce metrics based on system diagnostics #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ea97eb6
feat: add message publishing metrics
aygalinc Oct 31, 2024
a4638b5
chore: add more metrics
aygalinc Nov 3, 2024
8f51146
formatting
Gsantomaggio Nov 4, 2024
bb4e334
fix: use only dotnet 8 ImeterFactory for implementation
aygalinc Nov 4, 2024
ea925ab
chore: format
aygalinc Nov 7, 2024
40e0ae6
use rabbitmq:4.0.2-management
Gsantomaggio Nov 7, 2024
653b1b2
use rabbitmq:4.0.2-management
Gsantomaggio Nov 7, 2024
136aa04
merge
Gsantomaggio Nov 8, 2024
0b4b623
merge
Gsantomaggio Nov 8, 2024
19e834f
Modify Tests.csproj to use net462 since (supposedly) `Microsoft.Exten…
lukebakken Nov 11, 2024
2e91b4f
* `dotnet format` fixes
lukebakken Nov 11, 2024
3723713
* Suppress TFM support build warnings in Tests.csproj
lukebakken Nov 11, 2024
9dc565a
* Use `Stopwatch` in a netstandard2.0 compatible way.
lukebakken Nov 12, 2024
ecef22c
* Change `AmqpEnvironment` to non-async `Create` method.
lukebakken Nov 12, 2024
3270d7a
* Only create `Stopwatch` instance if `IMetricsReporter` is not null.
lukebakken Nov 12, 2024
01a24a2
fixup
lukebakken Nov 12, 2024
db3cabe
chore: add example projet to integrate metric in Open telemetry sdk
aygalinc Nov 13, 2024
3ddf4ab
chore: apply format
aygalinc Nov 13, 2024
a3c073c
chore: update example readme
aygalinc Nov 13, 2024
aa4f348
chore: remove unecessary code
aygalinc Nov 14, 2024
e24ca87
* Move some stuff around
lukebakken Nov 14, 2024
abec929
* Combine metrics context data classes into the same class.
lukebakken Nov 14, 2024
330ce0c
* Start bringing the metrics in-line with the Java AMQP 1.0 client.
lukebakken Nov 16, 2024
c39559a
*Combine metrics tests into a single test suite
lukebakken Nov 18, 2024
7fdf08a
* Add elapsed timespan to publish measurements
lukebakken Nov 18, 2024
1206875
fixup
lukebakken Nov 18, 2024
925c6ae
Thanks @aygalinc for noticing that a call to `Pause` is missing in a …
lukebakken Nov 18, 2024
94edb0d
* Collect consume elapsed time duration.
lukebakken Nov 18, 2024
1e566fd
* Use seconds instead of milliseconds.
lukebakken Nov 18, 2024
c594cff
* Ensure all of the new metrics are tested.
lukebakken Nov 18, 2024
75ba9a8
* Misc fixes, add `InternalBugException`
lukebakken Nov 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
<ItemGroup>
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
<!-- HAClient -->
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
<!-- Tests -->
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="9.0.0" />
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
Expand All @@ -22,11 +28,7 @@
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
-->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.1.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
Expand All @@ -36,4 +38,4 @@
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/ILifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public override string ToString()

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

// TODO consider adding IAsyncDisposable that could call CloseAsync()
public interface ILifeCycle : IDisposable
{
Task CloseAsync();
Expand Down
40 changes: 40 additions & 0 deletions RabbitMQ.AMQP.Client/IMetricsReporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;

namespace RabbitMQ.AMQP.Client
{
public interface IMetricsReporter
{
enum PublishDispositionValue
{
ACCEPTED,
REJECTED,
RELEASED
};

enum ConsumeDispositionValue
{
ACCEPTED,
DISCARDED,
REQUEUED
};

void ConnectionOpened();
void ConnectionClosed();

void PublisherOpened();
void PublisherClosed();

void ConsumerOpened();
void ConsumerClosed();

void Published(TimeSpan elapsed);
void PublishDisposition(PublishDispositionValue disposition);

void Consumed(TimeSpan elapsed);
void ConsumeDisposition(ConsumeDispositionValue disposition);
}
}
33 changes: 19 additions & 14 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

private readonly IConnectionSettings _connectionSettings;
internal readonly IConnectionSettings _connectionSettings;
private readonly IMetricsReporter? _metricsReporter;
internal readonly AmqpSessionManagement _nativePubSubSessions;

private readonly Dictionary<string, object> _connectionProperties = new();
Expand Down Expand Up @@ -104,7 +105,6 @@ public IRpcServerBuilder RpcServerBuilder()

public IRpcClientBuilder RpcClientBuilder()
{

return new AmqpRpcClientBuilder(this);
}

Expand All @@ -131,17 +131,22 @@ public IEnumerable<IConsumer> Consumers
public IReadOnlyDictionary<string, object> Properties => _connectionProperties;

public long Id { get; set; }

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
/// - Management. See <see cref="AmqpManagement"/>
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <param name="metricsReporter"></param>
/// <returns></returns>
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
// TODO to play nicely with IoC containers, we should not have static Create methods
// TODO rename to CreateAndOpenAsync
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings,
IMetricsReporter? metricsReporter = default)
{
var connection = new AmqpConnection(connectionSettings);
var connection = new AmqpConnection(connectionSettings, metricsReporter);
await connection.OpenAsync()
.ConfigureAwait(false);
return connection;
Expand All @@ -154,7 +159,7 @@ public IManagement Management()

public IConsumerBuilder ConsumerBuilder()
{
return new AmqpConsumerBuilder(this);
return new AmqpConsumerBuilder(this, _metricsReporter);
}

// TODO cancellation token
Expand All @@ -170,7 +175,7 @@ await base.OpenAsync()
public IPublisherBuilder PublisherBuilder()
{
ThrowIfClosed();
var publisherBuilder = new AmqpPublisherBuilder(this);
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
return publisherBuilder;
}

Expand Down Expand Up @@ -235,6 +240,7 @@ protected override void Dispose(bool disposing)
{
_nativeConnection.Closed -= _closedCallback;
}

_semaphoreOpen.Dispose();
_semaphoreClose.Dispose();
}
Expand Down Expand Up @@ -265,9 +271,10 @@ await consumer.CloseAsync()
}
}

private AmqpConnection(IConnectionSettings connectionSettings)
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter)
{
_connectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
_management =
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
Expand All @@ -291,10 +298,7 @@ await _semaphoreOpen.WaitAsync(cancellationToken)
HostName = $"vhost:{_connectionSettings.VirtualHost}",
// Note: no need to set cf.AMQP.ContainerId
ContainerId = _connectionSettings.ContainerId,
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
}
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ContainerId, }
};

if (_connectionSettings.MaxFrameSize > uint.MinValue)
Expand Down Expand Up @@ -350,7 +354,8 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)
if (_connectionSettings is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
else
{
Expand Down Expand Up @@ -497,8 +502,8 @@ await OpenConnectionAsync(CancellationToken.None)
if (false == connected)
{
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
DoClose(notRecoveredError);
return;
}
Expand Down
35 changes: 29 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Trace = Amqp.Trace;
using TraceLevel = Amqp.TraceLevel;

namespace RabbitMQ.AMQP.Client.Impl
{
Expand All @@ -28,12 +31,13 @@ private enum PauseStatus
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
private readonly ConsumerConfiguration _configuration;
private readonly IMetricsReporter? _metricsReporter;

internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration)
internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration, IMetricsReporter? metricsReporter)
{
_amqpConnection = amqpConnection;
_configuration = configuration;

_metricsReporter = metricsReporter;
_amqpConnection.AddConsumer(_id, this);
}

Expand Down Expand Up @@ -127,11 +131,21 @@ private async Task ProcessMessages()
return;
}

Stopwatch? stopwatch = null;
if (_metricsReporter is not null)
{
stopwatch = new();
}

while (_receiverLink is { LinkState: LinkState.Attached })
{
stopwatch?.Restart();

// TODO the timeout waiting for messages should be configurable
TimeSpan timeout = TimeSpan.FromSeconds(60);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout)
.ConfigureAwait(false);

if (nativeMessage is null)
{
// this is not a problem, it is just a timeout.
Expand All @@ -144,15 +158,24 @@ private async Task ProcessMessages()

_unsettledMessageCounter.Increment();

IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
IContext context = new DeliveryContext(_receiverLink, nativeMessage,
_unsettledMessageCounter, _metricsReporter);
var amqpMessage = new AmqpMessage(nativeMessage);

// TODO catch exceptions thrown by handlers,
// then call exception handler?
if (_configuration.Handler != null)
if (_configuration.Handler is not null)
{
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
await _configuration.Handler(context, amqpMessage)
.ConfigureAwait(false);
}

if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Consumed(stopwatch.Elapsed);
}

}
}
catch (Exception e)
Expand Down
7 changes: 5 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal sealed class ConsumerConfiguration
public string Address { get; set; } = "";
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
public Map Filters { get; set; } = new();
// TODO is a MessageHandler *really* optional???
public MessageHandler? Handler { get; set; }
// TODO re-name to ListenerContextAction? Callback?
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
Expand All @@ -32,10 +33,12 @@ public class AmqpConsumerBuilder : IConsumerBuilder
{
private readonly ConsumerConfiguration _configuration = new();
private readonly AmqpConnection _amqpConnection;
private readonly IMetricsReporter? _metricsReporter;

public AmqpConsumerBuilder(AmqpConnection connection)
public AmqpConsumerBuilder(AmqpConnection connection, IMetricsReporter? metricsReporter)
{
_amqpConnection = connection;
_metricsReporter = metricsReporter;
}

public IConsumerBuilder Queue(IQueueSpecification queueSpec)
Expand Down Expand Up @@ -81,7 +84,7 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
throw new ConsumerException("Message handler is not set");
}

AmqpConsumer consumer = new(_amqpConnection, _configuration);
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);

// TODO pass cancellationToken
await consumer.OpenAsync()
Expand Down
13 changes: 8 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@ namespace RabbitMQ.AMQP.Client.Impl
{
public class AmqpEnvironment : IEnvironment
{
private IConnectionSettings? ConnectionSettings { get; }
private IConnectionSettings ConnectionSettings { get; }
private long _sequentialId = 0;
private readonly ConcurrentDictionary<long, IConnection> _connections = new();
private readonly IMetricsReporter? _metricsReporter;

private AmqpEnvironment(IConnectionSettings connectionSettings)
private AmqpEnvironment(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
{
ConnectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
}

public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
// TODO to play nicely with IoC containers, we should not have static Create methods
public static IEnvironment Create(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
{
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
return new AmqpEnvironment(connectionSettings, metricsReporter);
}

public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
{
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
IConnection c = await AmqpConnection.CreateAsync(connectionSettings, _metricsReporter).ConfigureAwait(false);
c.Id = Interlocked.Increment(ref _sequentialId);
_connections.TryAdd(c.Id, c);
c.ChangeState += (sender, previousState, currentState, failureCause) =>
Expand Down
Loading