Skip to content

Commit 23a8e25

Browse files
committed
fix: use only dotnet 8 ImeterFactory for implementation
1 parent 90cfc34 commit 23a8e25

File tree

11 files changed

+109
-116
lines changed

11 files changed

+109
-116
lines changed

.ci/ubuntu/one-node/gha-setup.sh

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4-management}"
1414
readonly docker_name_prefix='rabbitmq-amqp-dotnet-client'
1515
readonly docker_network_name="$docker_name_prefix-network"
1616

17-
if [[ ! -v GITHUB_ACTIONS ]]
18-
then
19-
GITHUB_ACTIONS='false'
20-
fi
2117

2218
if [[ -d $GITHUB_WORKSPACE ]]
2319
then
@@ -126,14 +122,7 @@ function get_rabbitmq_id
126122
local rabbitmq_docker_id
127123
rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
128124
echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'"
129-
if [[ -v GITHUB_OUTPUT ]]
130-
then
131-
if [[ -f $GITHUB_OUTPUT ]]
132-
then
133-
echo "[INFO] GITHUB_OUTPUT file: '$GITHUB_OUTPUT'"
134-
fi
135-
echo "id=$rabbitmq_docker_id" >> "$GITHUB_OUTPUT"
136-
fi
125+
137126
}
138127

139128
function install_ca_certificate
@@ -146,15 +135,9 @@ function install_ca_certificate
146135
openssl version -d
147136
set -o errexit
148137

149-
if [[ $GITHUB_ACTIONS == 'true' ]]
150-
then
151-
readonly openssl_store_dir='/usr/lib/ssl/certs'
152-
sudo cp -vf "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" "$openssl_store_dir"
153-
sudo ln -vsf "$openssl_store_dir/ca_certificate.pem" "$openssl_store_dir/$(openssl x509 -hash -noout -in $openssl_store_dir/ca_certificate.pem).0"
154-
else
138+
155139
echo "[WARNING] you must install '$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem' manually into your trusted root store"
156-
fi
157-
140+
158141
openssl s_client -connect localhost:5671 \
159142
-CAfile "$GITHUB_WORKSPACE/.ci/certs/ca_certificate.pem" \
160143
-cert "$GITHUB_WORKSPACE/.ci/certs/client_localhost_certificate.pem" \

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
</ItemGroup>
3030
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
3131
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
32+
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="8.0.1" />
3233
</ItemGroup>
3334
<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
3435
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
3636
private readonly RecordingTopologyListener _recordingTopologyListener = new();
3737

3838
internal readonly IConnectionSettings _connectionSettings;
39+
private readonly IMetricsReporter _metricsReporter;
3940
internal readonly AmqpSessionManagement _nativePubSubSessions;
40-
41+
4142
/// <summary>
4243
/// Publishers contains all the publishers created by the connection.
4344
/// Each connection can have multiple publishers.
@@ -90,10 +91,12 @@ public ReadOnlyCollection<IConsumer> GetConsumers()
9091
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
9192
/// </summary>
9293
/// <param name="connectionSettings"></param>
94+
/// <param name="metricsReporter"></param>
9395
/// <returns></returns>
94-
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
96+
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings,
97+
IMetricsReporter? metricsReporter = default)
9598
{
96-
var connection = new AmqpConnection(connectionSettings);
99+
var connection = new AmqpConnection(connectionSettings, metricsReporter ?? new NoOpMetricsReporter());
97100
await connection.OpenAsync()
98101
.ConfigureAwait(false);
99102
return connection;
@@ -106,7 +109,7 @@ public IManagement Management()
106109

107110
public IConsumerBuilder ConsumerBuilder()
108111
{
109-
return new AmqpConsumerBuilder(this);
112+
return new AmqpConsumerBuilder(this, _metricsReporter);
110113
}
111114

112115
// TODO cancellation token
@@ -122,7 +125,7 @@ await base.OpenAsync()
122125
public IPublisherBuilder PublisherBuilder()
123126
{
124127
ThrowIfClosed();
125-
var publisherBuilder = new AmqpPublisherBuilder(this);
128+
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
126129
return publisherBuilder;
127130
}
128131

@@ -218,9 +221,10 @@ await consumer.CloseAsync()
218221
}
219222
}
220223

221-
private AmqpConnection(IConnectionSettings connectionSettings)
224+
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter metricsReporter)
222225
{
223226
_connectionSettings = connectionSettings;
227+
_metricsReporter = metricsReporter;
224228
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
225229
_management =
226230
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ public class ConsumerConfiguration
3030
public class AmqpConsumerBuilder : IConsumerBuilder
3131
{
3232
private readonly ConsumerConfiguration _configuration = new();
33+
private readonly IMetricsReporter _metricsReporter;
3334

34-
public AmqpConsumerBuilder(AmqpConnection connection)
35+
public AmqpConsumerBuilder(AmqpConnection connection, IMetricsReporter metricsReporter)
3536
{
37+
_metricsReporter = metricsReporter;
3638
_configuration.Connection = connection;
3739
}
3840

@@ -77,13 +79,8 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
7779
{
7880
throw new ConsumerException("Message handler is not set");
7981
}
80-
#if NET6_0_OR_GREATER
81-
IMetricsReporter metricsReporter = new MetricsReporter();
82-
#else
83-
IMetricsReporter metricsReporter = new NoOpMetricsReporter();
84-
#endif
8582

86-
AmqpConsumer consumer = new(_configuration, metricsReporter);
83+
AmqpConsumer consumer = new(_configuration, _metricsReporter);
8784

8885
// TODO pass cancellationToken
8986
await consumer.OpenAsync()

RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,27 @@ namespace RabbitMQ.AMQP.Client.Impl
1212
{
1313
public class AmqpEnvironment : IEnvironment
1414
{
15-
private IConnectionSettings? ConnectionSettings { get; }
15+
private IConnectionSettings ConnectionSettings { get; }
1616
private long _sequentialId = 0;
1717
private readonly ConcurrentDictionary<long, IConnection> _connections = new();
18+
private readonly IMetricsReporter? _metricsReporter;
1819

19-
private AmqpEnvironment(IConnectionSettings connectionSettings)
20+
private AmqpEnvironment(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter)
2021
{
2122
ConnectionSettings = connectionSettings;
23+
_metricsReporter = metricsReporter;
2224
}
2325

24-
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
26+
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings,
27+
IMetricsReporter? metricsReporter = default)
2528
{
26-
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
29+
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings,
30+
metricsReporter ?? new NoOpMetricsReporter()));
2731
}
2832

2933
public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
3034
{
31-
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
35+
IConnection c = await AmqpConnection.CreateAsync(connectionSettings,_metricsReporter).ConfigureAwait(false);
3236
c.Id = Interlocked.Increment(ref _sequentialId);
3337
_connections.TryAdd(c.Id, c);
3438
c.ChangeState += (sender, previousState, currentState, failureCause) =>

RabbitMQ.AMQP.Client/Impl/AmqpPublisherBuilder.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ public class AmqpPublisherBuilder : IPublisherBuilder
1515
private string? _key = null;
1616
private string? _queue = null;
1717
private TimeSpan _timeout = TimeSpan.FromSeconds(10);
18+
private readonly IMetricsReporter _metricsReporter;
1819

19-
public AmqpPublisherBuilder(AmqpConnection connection)
20+
public AmqpPublisherBuilder(AmqpConnection connection, IMetricsReporter metricsReporter)
2021
{
2122
_connection = connection;
23+
_metricsReporter = metricsReporter;
2224
}
2325

2426
public IPublisherBuilder Exchange(IExchangeSpecification exchangeSpec)
@@ -67,12 +69,8 @@ public async Task<IPublisher> BuildAsync(CancellationToken cancellationToken = d
6769
{
6870
address = AddressBuilderHelper.AddressBuilder().Exchange(_exchange).Queue(_queue).Key(_key).Address();
6971
}
70-
#if NET6_0_OR_GREATER
71-
IMetricsReporter metricsReporter = new MetricsReporter();
72-
#else
73-
IMetricsReporter metricsReporter = new NoOpMetricsReporter();
74-
#endif
75-
AmqpPublisher publisher = new(_connection, address, metricsReporter, _timeout);
72+
73+
AmqpPublisher publisher = new(_connection, address, _metricsReporter, _timeout);
7674

7775
// TODO pass cancellationToken
7876
await publisher.OpenAsync()

RabbitMQ.AMQP.Client/Impl/MetricsReporter.cs

Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#if NET6_0_OR_GREATER
1+
#if NET8_0_OR_GREATER
22
using System;
33
using System.Collections.Generic;
44
using System.Diagnostics;
@@ -8,18 +8,18 @@
88

99
namespace RabbitMQ.AMQP.Client.Impl
1010
{
11-
#if NET6_0_OR_GREATER
11+
#if NET8_0_OR_GREATER
1212
// .NET docs on metric instrumentation: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation
1313
// OpenTelemetry semantic conventions for messaging metric: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics
1414
internal sealed class MetricsReporter : IMetricsReporter
1515
{
1616
const string Version = "0.1.0";
1717

18-
static readonly Counter<int> s_messagingClientSentMessages;
19-
static readonly Histogram<double> s_messagingClientOperationDuration;
18+
readonly Counter<int> _messagingClientSentMessages;
19+
readonly Histogram<double> _messagingClientOperationDuration;
2020

21-
static readonly Counter<int> s_messagingClientConsumedMessages;
22-
static readonly Histogram<double> s_messagingProcessDuration;
21+
readonly Counter<int> _messagingClientConsumedMessages;
22+
readonly Histogram<double> _messagingProcessDuration;
2323

2424
readonly KeyValuePair<string, object?>
2525
_messagingOperationSystemTag = new(MessagingSystem, MessagingSystemValue);
@@ -44,29 +44,29 @@ internal sealed class MetricsReporter : IMetricsReporter
4444
private const string SendOperation = "send";
4545
private const string MessagingSystemValue = "rabbitmq";
4646

47-
static MetricsReporter()
47+
public MetricsReporter(IMeterFactory meterFactory)
4848
{
49-
Meter meter = new("RabbitMQ.Amqp", Version);
49+
Meter meter = meterFactory.Create("RabbitMQ.Amqp", Version);
5050

51-
s_messagingClientSentMessages = meter.CreateCounter<int>(
51+
_messagingClientSentMessages = meter.CreateCounter<int>(
5252
"messaging.client.sent.messages",
5353
unit: "{message}",
5454
description:
5555
"Number of messages producer attempted to send to the broker.");
5656

57-
s_messagingClientOperationDuration = meter.CreateHistogram<double>(
57+
_messagingClientOperationDuration = meter.CreateHistogram<double>(
5858
"messaging.client.operation.duration",
5959
unit: "s",
6060
description:
6161
"Duration of messaging operation initiated by a producer or consumer client.");
6262

63-
s_messagingClientConsumedMessages = meter.CreateCounter<int>(
63+
_messagingClientConsumedMessages = meter.CreateCounter<int>(
6464
"messaging.client.consumed.messages",
6565
unit: "{message}",
6666
description:
6767
"Number of messages that were delivered to the application. ");
6868

69-
s_messagingProcessDuration = meter.CreateHistogram<double>(
69+
_messagingProcessDuration = meter.CreateHistogram<double>(
7070
"messaging.process.duration",
7171
unit: "s",
7272
description:
@@ -79,17 +79,12 @@ public void ReportMessageSendSuccess(IMetricsReporter.PublisherContext context,
7979
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
8080
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
8181

82-
s_messagingClientSentMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag,
82+
_messagingClientSentMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag,
8383
_sendOperationType, _publishOperationName);
8484
if (startTimestamp > 0)
8585
{
86-
#if NET7_0_OR_GREATER
87-
var duration = Stopwatch.GetElapsedTime(startTimestamp);
88-
#else
89-
var duration =
90-
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * s_stopWatchTickFrequency));
91-
#endif
92-
s_messagingClientOperationDuration.Record(duration.TotalSeconds, serverAddress, serverPort, destination,
86+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
87+
_messagingClientOperationDuration.Record(duration.TotalSeconds, serverAddress, serverPort, destination,
9388
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
9489
}
9590
}
@@ -101,18 +96,13 @@ public void ReportMessageSendFailure(IMetricsReporter.PublisherContext context,
10196
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
10297
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
10398
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
104-
s_messagingClientSentMessages.Add(1, errorType, serverAddress, serverPort, destination,
99+
_messagingClientSentMessages.Add(1, errorType, serverAddress, serverPort, destination,
105100
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
106101

107102
if (startTimestamp > 0)
108103
{
109-
#if NET7_0_OR_GREATER
110-
var duration = Stopwatch.GetElapsedTime(startTimestamp);
111-
#else
112-
var duration =
113-
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * s_stopWatchTickFrequency));
114-
#endif
115-
s_messagingClientOperationDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
104+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
105+
_messagingClientOperationDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
116106
destination,
117107
_messagingOperationSystemTag, _sendOperationType, _publishOperationName);
118108
}
@@ -123,20 +113,18 @@ public void ReportMessageDeliverSuccess(IMetricsReporter.ConsumerContext context
123113
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
124114
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
125115
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
126-
s_messagingClientConsumedMessages.Add(1, serverAddress, serverPort, destination, _messagingOperationSystemTag,
116+
_messagingClientConsumedMessages.Add(1, serverAddress, serverPort, destination,
117+
_messagingOperationSystemTag,
127118
_processOperationType, _deliverOperationName);
128-
if (startTimestamp > 0)
119+
if (startTimestamp <= 0)
129120
{
130-
#if NET7_0_OR_GREATER
131-
var duration = Stopwatch.GetElapsedTime(startTimestamp);
132-
#else
133-
var duration =
134-
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * s_stopWatchTickFrequency));
135-
#endif
136-
s_messagingProcessDuration.Record(duration.TotalSeconds, serverAddress, serverPort,
137-
destination,
138-
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
121+
return;
139122
}
123+
124+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
125+
_messagingProcessDuration.Record(duration.TotalSeconds, serverAddress, serverPort,
126+
destination,
127+
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
140128
}
141129

142130
public void ReportMessageDeliverFailure(IMetricsReporter.ConsumerContext context, long startTimestamp,
@@ -146,28 +134,19 @@ public void ReportMessageDeliverFailure(IMetricsReporter.ConsumerContext context
146134
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
147135
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
148136
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
149-
s_messagingClientConsumedMessages.Add(1, errorType, serverAddress, serverPort, destination,
137+
_messagingClientConsumedMessages.Add(1, errorType, serverAddress, serverPort, destination,
150138
_messagingOperationSystemTag,
151139
_processOperationType, _deliverOperationName);
152-
if (startTimestamp > 0)
140+
if (startTimestamp <= 0)
153141
{
154-
#if NET7_0_OR_GREATER
155-
var duration = Stopwatch.GetElapsedTime(startTimestamp);
156-
#else
157-
var duration =
158-
new TimeSpan((long)((Stopwatch.GetTimestamp() - startTimestamp) * s_stopWatchTickFrequency));
159-
#endif
160-
s_messagingProcessDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
161-
destination,
162-
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
142+
return;
163143
}
144+
145+
var duration = Stopwatch.GetElapsedTime(startTimestamp);
146+
_messagingProcessDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
147+
destination,
148+
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
164149
}
165-
#if !NET7_0_OR_GREATER
166-
const long TicksPerMicrosecond = 10;
167-
const long TicksPerMillisecond = TicksPerMicrosecond * 1000;
168-
const long TicksPerSecond = TicksPerMillisecond * 1000; // 10,000,000
169-
static readonly double s_stopWatchTickFrequency = (double)TicksPerSecond / Stopwatch.Frequency;
170-
#endif
171150
}
172151
#else
173152
#endif

0 commit comments

Comments
 (0)