Skip to content

Commit bfaaaf7

Browse files
author
Isaiah Inuwa
committed
Update to latest OTel conventions
- Swap order of queue name and operation in Activity name - Add messaging.operation.name tag
1 parent 4525c6d commit bfaaaf7

File tree

5 files changed

+37
-25
lines changed

5 files changed

+37
-25
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
6262
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6363

6464
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
65-
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
65+
? RabbitMQActivitySource.BasicPublish(routingKey, exchange, body.Length)
6666
: default;
6767

6868
ulong publishSequenceNumber = 0;
@@ -117,7 +117,7 @@ await MaybeEnforceFlowControlAsync(cancellationToken)
117117
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
118118

119119
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
120-
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
120+
? RabbitMQActivitySource.BasicPublish(routingKey.Value, exchange.Value, body.Length)
121121
: default;
122122

123123
ulong publishSequenceNumber = 0;

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -918,10 +918,10 @@ await ModelSendAsync(in method, k.CancellationToken)
918918
BasicGetResult? result = await k;
919919

920920
using Activity? activity = result != null
921-
? RabbitMQActivitySource.Receive(result.RoutingKey,
921+
? RabbitMQActivitySource.BasicGet(result.RoutingKey,
922922
result.Exchange,
923923
result.DeliveryTag, result.BasicProperties, result.Body.Length)
924-
: RabbitMQActivitySource.ReceiveEmpty(queue);
924+
: RabbitMQActivitySource.BasicGetEmpty(queue);
925925

926926
activity?.SetStartTime(k.StartTime);
927927

projects/RabbitMQ.Client/Impl/RabbitMQActivitySource.cs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ public static class RabbitMQActivitySource
1515
// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
1616
internal const string MessageId = "messaging.message.id";
1717
internal const string MessageConversationId = "messaging.message.conversation_id";
18+
internal const string MessagingOperationName = "messaging.operation.name";
19+
internal const string MessagingOperationNameBasicDeliver = "deliver";
20+
internal const string MessagingOperationNameBasicGet = "fetch";
21+
internal const string MessagingOperationNameBasicGetEmpty = "fetch (empty)";
22+
internal const string MessagingOperationNameBasicPublish = "publish";
1823
internal const string MessagingOperationType = "messaging.operation.type";
1924
internal const string MessagingOperationTypeSend = "send";
2025
internal const string MessagingOperationTypeProcess = "process";
@@ -56,7 +61,7 @@ public static class RabbitMQActivitySource
5661
new KeyValuePair<string, object?>(ProtocolVersion, "0.9.1")
5762
};
5863

59-
internal static Activity? Send(string routingKey, string exchange, int bodySize,
64+
internal static Activity? BasicPublish(string routingKey, string exchange, int bodySize,
6065
ActivityContext linkedContext = default)
6166
{
6267
if (!s_publisherSource.HasListeners())
@@ -66,41 +71,42 @@ public static class RabbitMQActivitySource
6671

6772
Activity? activity = linkedContext == default
6873
? s_publisherSource.StartRabbitMQActivity(
69-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
74+
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
7075
ActivityKind.Producer)
7176
: s_publisherSource.StartLinkedRabbitMQActivity(
72-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeSend}" : MessagingOperationTypeSend,
77+
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicPublish} {routingKey}" : MessagingOperationNameBasicPublish,
7378
ActivityKind.Producer, linkedContext);
7479
if (activity != null && activity.IsAllDataRequested)
7580
{
76-
PopulateMessagingTags(MessagingOperationTypeSend, routingKey, exchange, 0, bodySize, activity);
81+
PopulateMessagingTags(MessagingOperationTypeSend, MessagingOperationNameBasicPublish, routingKey, exchange, 0, bodySize, activity);
7782
}
7883

7984
return activity;
8085

8186
}
8287

83-
internal static Activity? ReceiveEmpty(string queue)
88+
internal static Activity? BasicGetEmpty(string queue)
8489
{
8590
if (!s_subscriberSource.HasListeners())
8691
{
8792
return null;
8893
}
8994

9095
Activity? activity = s_subscriberSource.StartRabbitMQActivity(
91-
UseRoutingKeyAsOperationName ? $"{queue} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive,
96+
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGetEmpty} {queue}" : MessagingOperationNameBasicGetEmpty,
9297
ActivityKind.Consumer);
9398
if (activity != null && activity.IsAllDataRequested)
9499
{
95100
activity
96101
.SetTag(MessagingOperationType, MessagingOperationTypeReceive)
102+
.SetTag(MessagingOperationName, MessagingOperationNameBasicGetEmpty)
97103
.SetTag(MessagingDestination, "amq.default");
98104
}
99105

100106
return activity;
101107
}
102108

103-
internal static Activity? Receive(string routingKey, string exchange, ulong deliveryTag,
109+
internal static Activity? BasicGet(string routingKey, string exchange, ulong deliveryTag,
104110
IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize)
105111
{
106112
if (!s_subscriberSource.HasListeners())
@@ -110,11 +116,11 @@ public static class RabbitMQActivitySource
110116

111117
// Extract the PropagationContext of the upstream parent from the message headers.
112118
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
113-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeReceive}" : MessagingOperationTypeReceive, ActivityKind.Consumer,
119+
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicGet} {routingKey}" : MessagingOperationNameBasicGet, ActivityKind.Consumer,
114120
ContextExtractor(readOnlyBasicProperties));
115121
if (activity != null && activity.IsAllDataRequested)
116122
{
117-
PopulateMessagingTags(MessagingOperationTypeReceive, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
123+
PopulateMessagingTags(MessagingOperationTypeReceive, MessagingOperationNameBasicGet, routingKey, exchange, deliveryTag, readOnlyBasicProperties,
118124
bodySize, activity);
119125
}
120126

@@ -131,11 +137,11 @@ public static class RabbitMQActivitySource
131137

132138
// Extract the PropagationContext of the upstream parent from the message headers.
133139
Activity? activity = s_subscriberSource.StartLinkedRabbitMQActivity(
134-
UseRoutingKeyAsOperationName ? $"{routingKey} {MessagingOperationTypeProcess}" : MessagingOperationTypeProcess,
140+
UseRoutingKeyAsOperationName ? $"{MessagingOperationNameBasicDeliver} {routingKey}" : MessagingOperationNameBasicDeliver,
135141
ActivityKind.Consumer, ContextExtractor(basicProperties));
136142
if (activity != null && activity.IsAllDataRequested)
137143
{
138-
PopulateMessagingTags(MessagingOperationTypeProcess, routingKey, exchange,
144+
PopulateMessagingTags(MessagingOperationTypeProcess, MessagingOperationNameBasicDeliver, routingKey, exchange,
139145
deliveryTag, basicProperties, bodySize, activity);
140146
}
141147

@@ -157,10 +163,10 @@ public static class RabbitMQActivitySource
157163
?.Start();
158164
}
159165

160-
private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
166+
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
161167
ulong deliveryTag, IReadOnlyBasicProperties readOnlyBasicProperties, int bodySize, Activity activity)
162168
{
163-
PopulateMessagingTags(operation, routingKey, exchange, deliveryTag, bodySize, activity);
169+
PopulateMessagingTags(operationType, operationName, routingKey, exchange, deliveryTag, bodySize, activity);
164170

165171
if (!string.IsNullOrEmpty(readOnlyBasicProperties.CorrelationId))
166172
{
@@ -173,11 +179,12 @@ private static void PopulateMessagingTags(string operation, string routingKey, s
173179
}
174180
}
175181

176-
private static void PopulateMessagingTags(string operation, string routingKey, string exchange,
182+
private static void PopulateMessagingTags(string operationType, string operationName, string routingKey, string exchange,
177183
ulong deliveryTag, int bodySize, Activity activity)
178184
{
179185
activity
180-
.SetTag(MessagingOperationType, operation)
186+
.SetTag(MessagingOperationType, operationType)
187+
.SetTag(MessagingOperationName, operationName)
181188
.SetTag(MessagingDestination, string.IsNullOrEmpty(exchange) ? "amq.default" : exchange)
182189
.SetTag(MessagingDestinationRoutingKey, routingKey)
183190
.SetTag(MessagingBodySize, bodySize);

projects/Test/SequentialIntegration/TestActivitySource.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ private static ActivityListener StartActivityListener(List<Activity> activities)
402402
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
403403
List<Activity> activityList, bool isDeliver = false)
404404
{
405-
string childName = isDeliver ? "process" : "receive";
405+
string childName = isDeliver ? "deliver" : "fetch";
406406
Activity[] activities = activityList.ToArray();
407407
Assert.NotEmpty(activities);
408408

@@ -418,11 +418,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
418418
}
419419

420420
Activity sendActivity = activities.First(x =>
421-
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
421+
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
422422
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
423423
routingKeyTag == $"{queueName}");
424424
Activity receiveActivity = activities.Single(x =>
425-
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
425+
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
426426
x.Links.First().Context.TraceId == sendActivity.TraceId);
427427
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
428428
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);

projects/Test/SequentialIntegration/TestOpenTelemetry.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ public async Task TestPublisherAndBasicGetActivityTags(bool useRoutingKeyAsOpera
342342
private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueName,
343343
List<Activity> activityList, bool isDeliver = false, string baggageGuid = null)
344344
{
345-
string childName = isDeliver ? "process" : "receive";
345+
string childName = isDeliver ? "deliver" : "fetch";
346+
string childType = isDeliver ? "process" : "receive";
346347
Activity[] activities = activityList.ToArray();
347348
Assert.NotEmpty(activities);
348349
foreach (var item in activities)
@@ -354,11 +355,11 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
354355
}
355356

356357
Activity sendActivity = activities.First(x =>
357-
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} send" : "send") &&
358+
x.OperationName == (useRoutingKeyAsOperationName ? $"publish {queueName}" : "publish") &&
358359
x.GetTagItem(RabbitMQActivitySource.MessagingDestinationRoutingKey) is string routingKeyTag &&
359360
routingKeyTag == $"{queueName}");
360361
Activity receiveActivity = activities.Single(x =>
361-
x.OperationName == (useRoutingKeyAsOperationName ? $"{queueName} {childName}" : $"{childName}") &&
362+
x.OperationName == (useRoutingKeyAsOperationName ? $"{childName} {queueName}" : childName) &&
362363
x.Links.First().Context.TraceId == sendActivity.TraceId);
363364
Assert.Equal(ActivityKind.Producer, sendActivity.Kind);
364365
Assert.Equal(ActivityKind.Consumer, receiveActivity.Kind);
@@ -380,6 +381,10 @@ private void AssertActivityData(bool useRoutingKeyAsOperationName, string queueN
380381
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingEnvelopeSize);
381382
AssertIntTagGreaterThanZero(sendActivity, RabbitMQActivitySource.MessagingBodySize);
382383
AssertIntTagGreaterThanZero(receiveActivity, RabbitMQActivitySource.MessagingBodySize);
384+
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationType, childType);
385+
AssertStringTagEquals(receiveActivity, RabbitMQActivitySource.MessagingOperationName, childName);
386+
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationType, "send");
387+
AssertStringTagEquals(sendActivity, RabbitMQActivitySource.MessagingOperationName, "publish");
383388
}
384389
}
385390
}

0 commit comments

Comments
 (0)