Skip to content

Commit 5db800e

Browse files
committed
chore: adjust metric compute
1 parent 1ca625e commit 5db800e

File tree

7 files changed

+39
-60
lines changed

7 files changed

+39
-60
lines changed

RabbitMQ.AMQP.Client/IMetricsReporter.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System;
21
using Amqp;
32

43
namespace RabbitMQ.AMQP.Client
@@ -8,7 +7,6 @@ public interface IMetricsReporter
87
void ReportMessageSendSuccess(PublisherContext context, long startTimestamp);
98
void ReportMessageSendFailure(PublisherContext context, long startTimestamp, AmqpException amqpException);
109
public void ReportMessageDeliverSuccess(ConsumerContext context, long startTimestamp);
11-
void ReportMessageDeliverFailure(ConsumerContext consumerContext, long startTimestamp, Exception exception);
1210
sealed class ConsumerContext
1311
{
1412
public ConsumerContext(string? destination, string serverAddress, int serverPort)

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,20 @@ private async Task ProcessMessages()
154154
continue;
155155
}
156156

157+
_metricsReporter.ReportMessageDeliverSuccess(consumerContext, startTimestamp);
157158
_unsettledMessageCounter.Increment();
158159

159160
IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
160161
var amqpMessage = new AmqpMessage(nativeMessage);
161162

162163
// TODO catch exceptions thrown by handlers,
163164
// then call exception handler?
165+
164166
if (_configuration.Handler != null)
165167
{
166168
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
167169
}
168170

169-
_metricsReporter.ReportMessageDeliverSuccess(consumerContext, startTimestamp);
170171
}
171172
}
172173
catch (Exception e)
@@ -175,8 +176,7 @@ private async Task ProcessMessages()
175176
{
176177
return;
177178
}
178-
179-
_metricsReporter.ReportMessageDeliverFailure(consumerContext, startTimestamp, e);
179+
180180
Trace.WriteLine(TraceLevel.Error, $"{ToString()} Failed to process messages, {e}");
181181
// TODO this is where a Listener should get a closed event
182182
// See the ConsumerShouldBeClosedWhenQueueIsDeleted test

RabbitMQ.AMQP.Client/Impl/MetricsReporter.cs

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace RabbitMQ.AMQP.Client.Impl
99
{
1010
// .NET docs on metric instrumentation: https://learn.microsoft.com/en-us/dotnet/core/diagnostics/metrics-instrumentation
1111
// OpenTelemetry semantic conventions for messaging metric: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics
12-
public sealed class MetricsReporter : IMetricsReporter
12+
internal sealed class MetricsReporter : IMetricsReporter
1313
{
1414
const string Version = "0.1.0";
1515

@@ -42,6 +42,7 @@ public sealed class MetricsReporter : IMetricsReporter
4242
private const string SendOperation = "send";
4343
private const string MessagingSystemValue = "rabbitmq";
4444

45+
private const string DefaultErrorValue = "_OTHER";
4546
public MetricsReporter(IMeterFactory meterFactory)
4647
{
4748
Meter meter = meterFactory.Create("RabbitMQ.Amqp", Version);
@@ -90,7 +91,7 @@ public void ReportMessageSendSuccess(IMetricsReporter.PublisherContext context,
9091
public void ReportMessageSendFailure(IMetricsReporter.PublisherContext context, long startTimestamp,
9192
AmqpException amqpException)
9293
{
93-
var errorType = new KeyValuePair<string, object?>(ErrorType, amqpException.GetType().Name);
94+
var errorType = new KeyValuePair<string, object?>(ErrorType, amqpException.Error.Condition.ToString() ?? DefaultErrorValue);
9495
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
9596
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
9697
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
@@ -124,27 +125,6 @@ public void ReportMessageDeliverSuccess(IMetricsReporter.ConsumerContext context
124125
destination,
125126
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
126127
}
127-
128-
public void ReportMessageDeliverFailure(IMetricsReporter.ConsumerContext context, long startTimestamp,
129-
Exception exception)
130-
{
131-
var errorType = new KeyValuePair<string, object?>(ErrorType, exception.GetType().Name);
132-
var serverAddress = new KeyValuePair<string, object?>(ServerAddress, context.ServerAddress);
133-
var serverPort = new KeyValuePair<string, object?>(ServerPort, context.ServerPort);
134-
var destination = new KeyValuePair<string, object?>(MessageDestinationName, context.Destination);
135-
_messagingClientConsumedMessages.Add(1, errorType, serverAddress, serverPort, destination,
136-
_messagingOperationSystemTag,
137-
_processOperationType, _deliverOperationName);
138-
if (startTimestamp <= 0)
139-
{
140-
return;
141-
}
142-
143-
var duration = Stopwatch.GetElapsedTime(startTimestamp);
144-
_messagingProcessDuration.Record(duration.TotalSeconds, errorType, serverAddress, serverPort,
145-
destination,
146-
_messagingOperationSystemTag, _processOperationType, _deliverOperationName);
147-
}
148128
}
149129
}
150130
#endif

RabbitMQ.AMQP.Client/Impl/NoOpMetricsReporter.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,5 @@ public void ReportMessageSendFailure(IMetricsReporter.PublisherContext context,
1717
public void ReportMessageDeliverSuccess(IMetricsReporter.ConsumerContext context, long startTimestamp)
1818
{
1919
}
20-
21-
public void ReportMessageDeliverFailure(IMetricsReporter.ConsumerContext consumerContext, long startTimestamp,
22-
Exception exception)
23-
{
24-
}
2520
}
2621
}

RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext.Destination.get -> string
213213
RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext.PublisherContext(string? destination, string! serverAddress, int serverPort) -> void
214214
RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext.ServerAddress.get -> string!
215215
RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext.ServerPort.get -> int
216-
RabbitMQ.AMQP.Client.IMetricsReporter.ReportMessageDeliverFailure(RabbitMQ.AMQP.Client.IMetricsReporter.ConsumerContext! consumerContext, long startTimestamp, System.Exception! exception) -> void
217216
RabbitMQ.AMQP.Client.IMetricsReporter.ReportMessageDeliverSuccess(RabbitMQ.AMQP.Client.IMetricsReporter.ConsumerContext! context, long startTimestamp) -> void
218217
RabbitMQ.AMQP.Client.IMetricsReporter.ReportMessageSendFailure(RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext! context, long startTimestamp, Amqp.AmqpException! amqpException) -> void
219218
RabbitMQ.AMQP.Client.IMetricsReporter.ReportMessageSendSuccess(RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext! context, long startTimestamp) -> void
@@ -496,12 +495,6 @@ RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types
496495
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
497496
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
498497
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
499-
RabbitMQ.AMQP.Client.Impl.MetricsReporter
500-
RabbitMQ.AMQP.Client.Impl.MetricsReporter.MetricsReporter(System.Diagnostics.Metrics.IMeterFactory! meterFactory) -> void
501-
RabbitMQ.AMQP.Client.Impl.MetricsReporter.ReportMessageDeliverFailure(RabbitMQ.AMQP.Client.IMetricsReporter.ConsumerContext! context, long startTimestamp, System.Exception! exception) -> void
502-
RabbitMQ.AMQP.Client.Impl.MetricsReporter.ReportMessageDeliverSuccess(RabbitMQ.AMQP.Client.IMetricsReporter.ConsumerContext! context, long startTimestamp) -> void
503-
RabbitMQ.AMQP.Client.Impl.MetricsReporter.ReportMessageSendFailure(RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext! context, long startTimestamp, Amqp.AmqpException! amqpException) -> void
504-
RabbitMQ.AMQP.Client.Impl.MetricsReporter.ReportMessageSendSuccess(RabbitMQ.AMQP.Client.IMetricsReporter.PublisherContext! context, long startTimestamp) -> void
505498
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration
506499
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.Activated(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
507500
RabbitMQ.AMQP.Client.Impl.RecoveryConfiguration.BackOffDelayPolicy(RabbitMQ.AMQP.Client.IBackOffDelayPolicy! backOffDelayPolicy) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!

Tests/Consumer/ConsumerMetricsTests.cs

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// This source code is dual-licensed under the Apache License, version
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
45
#if NET8_0_OR_GREATER
56
using System.Diagnostics.Metrics;
67
using System.Threading.Tasks;
@@ -22,9 +23,11 @@ public async Task RecordMessageDeliverySuccessAndDuration()
2223
Assert.NotNull(_management);
2324
Assert.NotNull(_meterFactory);
2425
Assert.NotNull(_metricsReporter);
25-
26-
var messageConsumedCollector = new MetricCollector<int>(_meterFactory, "RabbitMQ.Amqp", "messaging.client.consumed.messages");
27-
var messageProcessingDurationCollector = new MetricCollector<double>(_meterFactory, "RabbitMQ.Amqp", "messaging.process.duration");
26+
27+
var messageConsumedCollector =
28+
new MetricCollector<int>(_meterFactory, "RabbitMQ.Amqp", "messaging.client.consumed.messages");
29+
var messageProcessingDurationCollector =
30+
new MetricCollector<double>(_meterFactory, "RabbitMQ.Amqp", "messaging.process.duration");
2831

2932
Assert.NotNull(_connection);
3033
Assert.NotNull(_management);
@@ -35,18 +38,12 @@ public async Task RecordMessageDeliverySuccessAndDuration()
3538
await PublishAsync(queueSpec, 2);
3639

3740
TaskCompletionSource<IMessage> tcs = new();
38-
//Need to send 2 message, metrics are recorded after message consumption so want on first message consumption will end tests before metrics being incremented
39-
bool isPairMessage = true;
4041
IConsumer consumer = await _connection.ConsumerBuilder()
4142
.Queue(queueSpec)
4243
.MessageHandler(async (context, message) =>
4344
{
4445
await context.AcceptAsync();
45-
isPairMessage = !isPairMessage;
46-
if (isPairMessage)
47-
{
48-
tcs.SetResult(message);
49-
}
46+
tcs.SetResult(message);
5047
}
5148
).BuildAndStartAsync();
5249

@@ -65,10 +62,10 @@ public async Task RecordMessageDeliverySuccessAndDuration()
6562
_connectionSettings!.Port);
6663
Assert.Equal(consumedMessagesMeasurements[0].Tags["server.address"],
6764
_connectionSettings!.Host);
68-
65+
6966
var consumedMessageDurationMeasurements = messageProcessingDurationCollector.GetMeasurementSnapshot();
7067
Assert.NotEmpty(consumedMessageDurationMeasurements);
71-
Assert.True( consumedMessageDurationMeasurements[0].Value > 0);
68+
Assert.True(consumedMessageDurationMeasurements[0].Value > 0);
7269
Assert.Equal(consumedMessageDurationMeasurements[0].Tags["messaging.system"], "rabbitmq");
7370
Assert.Equal(consumedMessageDurationMeasurements[0].Tags["messaging.operation.name"], "deliver");
7471
Assert.Equal(consumedMessageDurationMeasurements[0].Tags["messaging.operation.type"], "process");
@@ -78,18 +75,18 @@ public async Task RecordMessageDeliverySuccessAndDuration()
7875
_connectionSettings!.Port);
7976
Assert.Equal(consumedMessageDurationMeasurements[0].Tags["server.address"],
8077
_connectionSettings!.Host);
81-
78+
8279
await consumer.CloseAsync();
8380
consumer.Dispose();
8481
}
85-
82+
8683
// Setup a new service provider. This example creates the collection explicitly but you might leverage
8784
// a host or some other application setup code to do this as well.
8885
private ServiceProvider CreateServiceProvider()
8986
{
90-
return new ServiceCollection()
87+
return new ServiceCollection()
9188
.AddMetrics()
92-
.AddSingleton<IMetricsReporter,MetricsReporter>()
89+
.AddSingleton<IMetricsReporter, MetricsReporter>()
9390
.BuildServiceProvider();
9491
}
9592
}

Tests/Publisher/PublisherMetricsTests.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// This source code is dual-licensed under the Apache License, version
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
4+
45
#if NET8_0_OR_GREATER
56
using System;
67
using System.Text;
@@ -22,16 +23,18 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
2223
Assert.NotNull(_management);
2324
Assert.NotNull(_meterFactory);
2425
Assert.NotNull(_metricsReporter);
25-
var messageSendCollector = new MetricCollector<int>(_meterFactory, "RabbitMQ.Amqp", "messaging.client.sent.messages");
26-
// var messageProcessingDurationCollector = new MetricCollector<double>(_meterFactory, "RabbitMQ.Amqp", "messaging.process.duration");
26+
var messageSendCollector =
27+
new MetricCollector<int>(_meterFactory, "RabbitMQ.Amqp", "messaging.client.sent.messages");
28+
var messageSendDurationCollector =
29+
new MetricCollector<double>(_meterFactory, "RabbitMQ.Amqp", "messaging.client.operation.duration");
2730

2831
IQueueSpecification queueSpecification = _management.Queue(_queueName);
2932
await queueSpecification.DeclareAsync();
3033

3134
IPublisher publisher = await _connection.PublisherBuilder().Queue(queueSpecification).BuildAsync();
3235

3336
await publisher.PublishAsync(new AmqpMessage("Hello wold!"));
34-
37+
3538
await SystemUtils.WaitUntilQueueMessageCount(queueSpecification, 1);
3639

3740
var consumedMessagesMeasurements = messageSendCollector.GetMeasurementSnapshot();
@@ -46,7 +49,20 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
4649
_connectionSettings!.Port);
4750
Assert.Equal(consumedMessagesMeasurements[0].Tags["server.address"],
4851
_connectionSettings!.Host);
49-
52+
53+
var sendDurationsMeasurements = messageSendDurationCollector.GetMeasurementSnapshot();
54+
Assert.NotEmpty(sendDurationsMeasurements);
55+
Assert.True(sendDurationsMeasurements[0].Value > 0);
56+
Assert.Equal(sendDurationsMeasurements[0].Tags["messaging.system"], "rabbitmq");
57+
Assert.Equal(sendDurationsMeasurements[0].Tags["messaging.operation.name"], "publish");
58+
Assert.Equal(sendDurationsMeasurements[0].Tags["messaging.operation.type"], "send");
59+
Assert.Equal(sendDurationsMeasurements[0].Tags["messaging.destination.name"],
60+
$"/queues/{Utils.EncodePathSegment(queueSpecification.QueueName)}");
61+
Assert.Equal(sendDurationsMeasurements[0].Tags["server.port"],
62+
_connectionSettings!.Port);
63+
Assert.Equal(sendDurationsMeasurements[0].Tags["server.address"],
64+
_connectionSettings!.Host);
65+
5066
await publisher.CloseAsync();
5167
publisher.Dispose();
5268
}

0 commit comments

Comments
 (0)