Skip to content

Commit 72713ee

Browse files
authored
Merge pull request #1528 from stebet/otel-integration-package
Adding proper OpenTelemetry integration via. registration helpers and better context propagation
2 parents a912b00 + 58fab44 commit 72713ee

File tree

12 files changed

+916
-87
lines changed

12 files changed

+916
-87
lines changed

RabbitMQDotNetClient.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Common", "projects\Test\Com
4040
EndProject
4141
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ToxiproxyNetCore", "projects\toxiproxy-netcore\src\ToxiproxyNetCore\ToxiproxyNetCore.csproj", "{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}"
4242
EndProject
43+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQ.Client.OpenTelemetry", "projects\RabbitMQ.Client.OpenTelemetry\RabbitMQ.Client.OpenTelemetry.csproj", "{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}"
44+
EndProject
4345
Global
4446
GlobalSection(SolutionConfigurationPlatforms) = preSolution
4547
Debug|Any CPU = Debug|Any CPU
@@ -90,6 +92,10 @@ Global
9092
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Debug|Any CPU.Build.0 = Debug|Any CPU
9193
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.ActiveCfg = Release|Any CPU
9294
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A}.Release|Any CPU.Build.0 = Release|Any CPU
95+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
96+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Debug|Any CPU.Build.0 = Debug|Any CPU
97+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.ActiveCfg = Release|Any CPU
98+
{16BF2086-AC7D-4EC3-8660-CC16E663ACB1}.Release|Any CPU.Build.0 = Release|Any CPU
9399
EndGlobalSection
94100
GlobalSection(SolutionProperties) = preSolution
95101
HideSolutionNode = FALSE
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# RabbitMQ .NET Client - OpenTelemetry Instrumentation
2+
3+
## Introduction
4+
This library makes it easy to instrument your RabbitMQ .NET Client applications with OpenTelemetry.
5+
6+
## Examples
7+
The following examples demonstrate how to use the RabbitMQ .NET Client OpenTelemetry Instrumentation.
8+
9+
### Basic Usage
10+
11+
#### ASP.NET Core Configuration Example
12+
```csharp
13+
using OpenTelemetry.Trace;
14+
15+
// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
16+
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
17+
18+
var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
19+
{
20+
new TraceContextPropagator(),
21+
new BaggagePropagator()
22+
});
23+
24+
Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);
25+
26+
builder.Services.AddOpenTelemetry()
27+
.ConfigureResource(resource => resource
28+
.AddService(serviceName: builder.Environment.ApplicationName))
29+
.WithTracing(tracing => tracing
30+
.AddAspNetCoreInstrumentation()
31+
.AddRabbitMQInstrumentation()
32+
.AddConsoleExporter());
33+
```
34+
35+
#### Console Application Configuration Example
36+
```csharp
37+
using OpenTelemetry.Trace;
38+
39+
// Configure the OpenTelemetry SDK to trace ASP.NET Core, the RabbitMQ .NET Client and export the traces to the console.
40+
// Also configures context propagation to propagate the TraceContext and Baggage using the W3C specification.
41+
42+
var compositeTextMapPropagator = new CompositeTextMapPropagator(new TextMapPropagator[]
43+
{
44+
new TraceContextPropagator(),
45+
new BaggagePropagator()
46+
});
47+
48+
Sdk.SetDefaultTextMapPropagator(compositeTextMapPropagator);
49+
50+
var tracerProvider = Sdk.CreateTracerProviderBuilder()
51+
.AddRabbitMQInstrumentation()
52+
.AddConsoleExporter()
53+
.Build();
54+
```
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
5+
<NoWarn>$(NoWarn);CS1591</NoWarn>
6+
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
7+
<AssemblyTitle>RabbitMQ OpenTelemetry Integration Package for .NET</AssemblyTitle>
8+
<Authors>VMware</Authors>
9+
<Company>VMware, Inc. or its affiliates.</Company>
10+
<Copyright>Copyright © 2007-2023 VMware, Inc. or its affiliates.</Copyright>
11+
<Description>The RabbitMQ OpenTelemetry Library for .NET adds convenience extension methods for RabbitMQ/OpenTelemetry</Description>
12+
<GenerateDocumentationFile>true</GenerateDocumentationFile>
13+
<PackageIcon>icon.png</PackageIcon>
14+
<PackageLicenseExpression>Apache-2.0 OR MPL-2.0</PackageLicenseExpression>
15+
<PackageProjectUrl>https://www.rabbitmq.com/dotnet.html</PackageProjectUrl>
16+
<PackageTags>rabbitmq, amqp, oauth2</PackageTags>
17+
<Product>RabbitMQ</Product>
18+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
19+
<RepositoryUrl>https://github.com/rabbitmq/rabbitmq-dotnet-client.git</RepositoryUrl>
20+
<IncludeSymbols>true</IncludeSymbols>
21+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
22+
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
23+
<SignAssembly>true</SignAssembly>
24+
<MinVerTagPrefix>otel-</MinVerTagPrefix>
25+
<MinVerVerbosity>minimal</MinVerVerbosity>
26+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
27+
<PackageOutputPath>../../packages</PackageOutputPath>
28+
<PackageReadmeFile>README.md</PackageReadmeFile>
29+
<LangVersion>7.3</LangVersion>
30+
</PropertyGroup>
31+
32+
<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
33+
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
34+
<Deterministic>true</Deterministic>
35+
<EmbedUntrackedSources>true</EmbedUntrackedSources>
36+
</PropertyGroup>
37+
38+
<ItemGroup Condition="'$(Configuration)' == 'Release' and '$(SourceRoot)' == ''">
39+
<SourceRoot Include="$(MSBuildThisFileDirectory)/" />
40+
</ItemGroup>
41+
42+
<ItemGroup>
43+
<None Remove="icon.png" />
44+
<Content Include="icon.png" PackagePath="" />
45+
<None Include="README.md" Pack="true" PackagePath="/" />
46+
<InternalsVisibleTo Include="Unit" />
47+
<InternalsVisibleTo Include="Benchmarks" />
48+
</ItemGroup>
49+
50+
<ItemGroup>
51+
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" PrivateAssets="all" />
52+
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="all" />
53+
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="all" />
54+
<PackageReference Include="OpenTelemetry.Api" Version="1.7.0" />
55+
</ItemGroup>
56+
57+
<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
58+
</ItemGroup>
59+
60+
<ItemGroup>
61+
<ProjectReference Include="../RabbitMQ.Client/RabbitMQ.Client.csproj" />
62+
</ItemGroup>
63+
64+
</Project>
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Text;
6+
using OpenTelemetry.Context.Propagation;
7+
using RabbitMQ.Client;
8+
9+
namespace OpenTelemetry.Trace
10+
{
11+
public static class OpenTelemetryExtensions
12+
{
13+
public static TracerProviderBuilder AddRabbitMQInstrumentation(this TracerProviderBuilder builder)
14+
{
15+
RabbitMQActivitySource.UseRoutingKeyAsOperationName = true;
16+
RabbitMQActivitySource.ContextExtractor = OpenTelemetryContextExtractor;
17+
RabbitMQActivitySource.ContextInjector = OpenTelemetryContextInjector;
18+
builder.AddSource("RabbitMQ.Client.*");
19+
return builder;
20+
}
21+
22+
private static ActivityContext OpenTelemetryContextExtractor(IReadOnlyBasicProperties props)
23+
{
24+
// Extract the PropagationContext of the upstream parent from the message headers.
25+
var parentContext = Propagators.DefaultTextMapPropagator.Extract(default, props.Headers, OpenTelemetryContextGetter);
26+
Baggage.Current = parentContext.Baggage;
27+
return parentContext.ActivityContext;
28+
}
29+
30+
private static IEnumerable<string> OpenTelemetryContextGetter(IDictionary<string, object> carrier, string key)
31+
{
32+
try
33+
{
34+
if (carrier.TryGetValue(key, out object value) && value is byte[] bytes)
35+
{
36+
return new[] { Encoding.UTF8.GetString(bytes) };
37+
}
38+
}
39+
catch (Exception)
40+
{
41+
//this.logger.LogError(ex, "Failed to extract trace context.");
42+
}
43+
44+
return Enumerable.Empty<string>();
45+
}
46+
47+
private static void OpenTelemetryContextInjector(Activity activity, IDictionary<string, object> props)
48+
{
49+
// Inject the current Activity's context into the message headers.
50+
Propagators.DefaultTextMapPropagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, OpenTelemetryContextSetter);
51+
}
52+
53+
private static void OpenTelemetryContextSetter(IDictionary<string, object> carrier, string key, string value)
54+
{
55+
carrier[key] = Encoding.UTF8.GetBytes(value);
56+
}
57+
}
58+
}
6.61 KB
Loading

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,4 +956,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
956956
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, System.TimeSpan timeout) -> System.Threading.Tasks.Task
957957
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
958958
~static RabbitMQ.Client.IConnectionExtensions.CloseAsync(this RabbitMQ.Client.IConnection connection, ushort reasonCode, string reasonText, System.TimeSpan timeout) -> System.Threading.Tasks.Task
959+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.get -> System.Func<RabbitMQ.Client.IReadOnlyBasicProperties, System.Diagnostics.ActivityContext>
960+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextExtractor.set -> void
961+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.get -> System.Action<System.Diagnostics.Activity, System.Collections.Generic.IDictionary<string, object>>
962+
~static RabbitMQ.Client.RabbitMQActivitySource.ContextInjector.set -> void
959963
~virtual RabbitMQ.Client.DefaultBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task

projects/RabbitMQ.Client/client/api/IChannelExtensions.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Diagnostics;
3435
using System.Threading;
3536
using System.Threading.Tasks;
3637
using RabbitMQ.Client.client.impl;
@@ -87,17 +88,20 @@ public static Task<string> BasicConsumeAsync(this IChannel channel, string queue
8788
/// <remarks>
8889
/// The publication occurs with mandatory=false and immediate=false.
8990
/// </remarks>
90-
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
91+
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties,
92+
ReadOnlyMemory<byte> body)
9193
where T : IReadOnlyBasicProperties, IAmqpHeader
9294
{
9395
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
9496
}
9597

96-
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
97-
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
98+
public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey,
99+
ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
100+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
98101

99-
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
100-
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
102+
public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange,
103+
CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false) =>
104+
channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);
101105

102106
#nullable disable
103107

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,20 +1048,6 @@ await ModelSendAsync(method, k.CancellationToken)
10481048
}
10491049
}
10501050

1051-
private static void InjectTraceContextIntoBasicProperties(object propsObj, string key, string value)
1052-
{
1053-
if (!(propsObj is Dictionary<string, object> headers))
1054-
{
1055-
return;
1056-
}
1057-
1058-
// Only propagate headers if they haven't already been set
1059-
if (!headers.ContainsKey(key))
1060-
{
1061-
headers[key] = value;
1062-
}
1063-
}
1064-
10651051
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
10661052
TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory,
10671053
CancellationToken cancellationToken)
@@ -1083,10 +1069,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
10831069

10841070
try
10851071
{
1086-
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
1087-
RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
1072+
var cmd = new BasicPublishMemory(
1073+
Encoding.UTF8.GetBytes(exchange),
1074+
Encoding.UTF8.GetBytes(routingKey),
1075+
mandatory, default);
10881076
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
1089-
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length, existingContext)
1077+
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
10901078
: default;
10911079

10921080
if (sendActivity != null)
@@ -1144,10 +1132,8 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
11441132
try
11451133
{
11461134
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
1147-
1148-
RabbitMQActivitySource.TryGetExistingContext(basicProperties, out ActivityContext existingContext);
11491135
using Activity sendActivity = RabbitMQActivitySource.PublisherHasListeners
1150-
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length, existingContext)
1136+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
11511137
: default;
11521138

11531139
if (sendActivity != null)
@@ -1908,7 +1894,7 @@ private static BasicProperties PopulateActivityAndPropagateTraceId<TProperties>(
19081894
IDictionary<string, object> headers = props.Headers ?? new Dictionary<string, object>();
19091895

19101896
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
1911-
DistributedContextPropagator.Current.Inject(sendActivity, headers, InjectTraceContextIntoBasicProperties);
1897+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
19121898
props.Headers = headers;
19131899
return props;
19141900
}

0 commit comments

Comments
 (0)