@@ -59,6 +59,8 @@ public partial class HubConnection : IAsyncDisposable
59
59
// Default amount of bytes we'll buffer when using Stateful Reconnect until applying backpressure to sends from the client.
60
60
internal const long DefaultStatefulReconnectBufferSize = 100_000 ;
61
61
62
+ internal const string ActivityName = "Microsoft.AspNetCore.SignalR.Client.InvocationOut" ;
63
+
62
64
// The receive loop has a single reader and single writer at a time so optimize the channel for that
63
65
private static readonly UnboundedChannelOptions _receiveLoopOptions = new UnboundedChannelOptions
64
66
{
@@ -73,11 +75,13 @@ public partial class HubConnection : IAsyncDisposable
73
75
private readonly ILoggerFactory _loggerFactory ;
74
76
private readonly ILogger _logger ;
75
77
private readonly ConnectionLogScope _logScope ;
78
+ private readonly ActivitySource _activitySource ;
76
79
private readonly IHubProtocol _protocol ;
77
80
private readonly IServiceProvider _serviceProvider ;
78
81
private readonly IConnectionFactory _connectionFactory ;
79
82
private readonly IRetryPolicy ? _reconnectPolicy ;
80
83
private readonly EndPoint _endPoint ;
84
+ private readonly string ? _serviceName ;
81
85
private readonly ConcurrentDictionary < string , InvocationHandlerList > _handlers = new ConcurrentDictionary < string , InvocationHandlerList > ( StringComparer . Ordinal ) ;
82
86
83
87
// Holds all mutable state other than user-defined handlers and settable properties.
@@ -235,6 +239,10 @@ public HubConnection(IConnectionFactory connectionFactory,
235
239
236
240
_logScope = new ConnectionLogScope ( ) ;
237
241
242
+ // ActivitySource can be resolved from the service provider when unit testing.
243
+ _activitySource = ( serviceProvider . GetService < SignalRClientActivitySource > ( ) ?? SignalRClientActivitySource . Instance ) . ActivitySource ;
244
+ _serviceName = ( _endPoint is UriEndPoint e ) ? e . Uri . AbsolutePath . Trim ( '/' ) : null ;
245
+
238
246
var options = serviceProvider . GetService < IOptions < HubConnectionOptions > > ( ) ;
239
247
240
248
ServerTimeout = options ? . Value . ServerTimeout ?? DefaultServerTimeout ;
@@ -720,7 +728,9 @@ async Task OnStreamCanceled(InvocationRequest irq)
720
728
var readers = default ( Dictionary < string , object > ) ;
721
729
722
730
CheckDisposed ( ) ;
723
- var connectionState = await _state . WaitForActiveConnectionAsync ( nameof ( StreamAsChannelCoreAsync ) , token : cancellationToken ) . ConfigureAwait ( false ) ;
731
+
732
+ var activity = StartActivity ( methodName ) ;
733
+ var connectionState = await WaitForActiveConnectionWithActivityAsync ( nameof ( StreamAsChannelCoreAsync ) , activity , token : cancellationToken ) . ConfigureAwait ( false ) ;
724
734
725
735
ChannelReader < object ? > channel ;
726
736
try
@@ -731,7 +741,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
731
741
readers = PackageStreamingParams ( connectionState , ref args , out var streamIds ) ;
732
742
733
743
// I just want an excuse to use 'irq' as a variable name...
734
- var irq = InvocationRequest . Stream ( cancellationToken , returnType , connectionState . GetNextId ( ) , _loggerFactory , this , out channel ) ;
744
+ var irq = InvocationRequest . Stream ( cancellationToken , returnType , connectionState . GetNextId ( ) , _loggerFactory , this , activity , out channel ) ;
735
745
await InvokeStreamCore ( connectionState , methodName , irq , args , streamIds ? . ToArray ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
736
746
737
747
if ( cancellationToken . CanBeCanceled )
@@ -1003,12 +1013,34 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
1003
1013
}
1004
1014
}
1005
1015
1016
+ private async Task < ConnectionState > WaitForActiveConnectionWithActivityAsync ( string methodName , Activity ? activity , CancellationToken token )
1017
+ {
1018
+ try
1019
+ {
1020
+ return await _state . WaitForActiveConnectionAsync ( methodName , token ) . ConfigureAwait ( false ) ;
1021
+ }
1022
+ catch ( Exception ex )
1023
+ {
1024
+ // If there is an error getting an active connection then the invocation has failed.
1025
+ if ( activity is not null )
1026
+ {
1027
+ activity . SetStatus ( ActivityStatusCode . Error ) ;
1028
+ activity . SetTag ( "error.type" , ex . GetType ( ) . FullName ) ;
1029
+ activity . Stop ( ) ;
1030
+ }
1031
+
1032
+ throw ;
1033
+ }
1034
+ }
1035
+
1006
1036
private async Task < object ? > InvokeCoreAsyncCore ( string methodName , Type returnType , object ? [ ] args , CancellationToken cancellationToken )
1007
1037
{
1008
1038
var readers = default ( Dictionary < string , object > ) ;
1009
1039
1010
1040
CheckDisposed ( ) ;
1011
- var connectionState = await _state . WaitForActiveConnectionAsync ( nameof ( InvokeCoreAsync ) , token : cancellationToken ) . ConfigureAwait ( false ) ;
1041
+
1042
+ var activity = StartActivity ( methodName ) ;
1043
+ var connectionState = await WaitForActiveConnectionWithActivityAsync ( nameof ( InvokeCoreAsync ) , activity , token : cancellationToken ) . ConfigureAwait ( false ) ;
1012
1044
1013
1045
Task < object ? > invocationTask ;
1014
1046
try
@@ -1017,7 +1049,7 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
1017
1049
1018
1050
readers = PackageStreamingParams ( connectionState , ref args , out var streamIds ) ;
1019
1051
1020
- var irq = InvocationRequest . Invoke ( cancellationToken , returnType , connectionState . GetNextId ( ) , _loggerFactory , this , out invocationTask ) ;
1052
+ var irq = InvocationRequest . Invoke ( cancellationToken , returnType , connectionState . GetNextId ( ) , _loggerFactory , this , activity , out invocationTask ) ;
1021
1053
await InvokeCore ( connectionState , methodName , irq , args , streamIds ? . ToArray ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
1022
1054
1023
1055
LaunchStreams ( connectionState , readers , cancellationToken ) ;
@@ -1031,13 +1063,51 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
1031
1063
return await invocationTask . ConfigureAwait ( false ) ;
1032
1064
}
1033
1065
1066
+ private Activity ? StartActivity ( string methodName )
1067
+ {
1068
+ var activity = _activitySource . CreateActivity ( ActivityName , ActivityKind . Client ) ;
1069
+ if ( activity is null && Activity . Current is not null && _logger . IsEnabled ( LogLevel . Critical ) )
1070
+ {
1071
+ activity = new Activity ( ActivityName ) ;
1072
+ }
1073
+
1074
+ if ( activity is not null )
1075
+ {
1076
+ if ( ! string . IsNullOrEmpty ( _serviceName ) )
1077
+ {
1078
+ activity . DisplayName = $ "{ _serviceName } /{ methodName } ";
1079
+ activity . SetTag ( "rpc.service" , _serviceName ) ;
1080
+ }
1081
+ else
1082
+ {
1083
+ activity . DisplayName = methodName ;
1084
+ }
1085
+
1086
+ activity . SetTag ( "rpc.system" , "signalr" ) ;
1087
+ activity . SetTag ( "rpc.method" , methodName ) ;
1088
+
1089
+ if ( _endPoint is UriEndPoint e )
1090
+ {
1091
+ activity . SetTag ( "server.address" , e . Uri . Host ) ;
1092
+ activity . SetTag ( "server.port" , e . Uri . Port ) ;
1093
+ }
1094
+
1095
+ activity . Start ( ) ;
1096
+ }
1097
+
1098
+ return activity ;
1099
+ }
1100
+
1034
1101
private async Task InvokeCore ( ConnectionState connectionState , string methodName , InvocationRequest irq , object ? [ ] args , string [ ] ? streams , CancellationToken cancellationToken )
1035
1102
{
1036
1103
Log . PreparingBlockingInvocation ( _logger , irq . InvocationId , methodName , irq . ResultType . FullName ! , args . Length ) ;
1037
1104
1038
1105
// Client invocations are always blocking
1039
1106
var invocationMessage = new InvocationMessage ( irq . InvocationId , methodName , args , streams ) ;
1040
- InjectHeaders ( invocationMessage ) ;
1107
+ if ( irq . Activity is not null )
1108
+ {
1109
+ InjectHeaders ( irq . Activity , invocationMessage ) ;
1110
+ }
1041
1111
1042
1112
Log . RegisteringInvocation ( _logger , irq . InvocationId ) ;
1043
1113
connectionState . AddInvocation ( irq ) ;
@@ -1064,7 +1134,10 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
1064
1134
Log . PreparingStreamingInvocation ( _logger , irq . InvocationId , methodName , irq . ResultType . FullName ! , args . Length ) ;
1065
1135
1066
1136
var invocationMessage = new StreamInvocationMessage ( irq . InvocationId , methodName , args , streams ) ;
1067
- InjectHeaders ( invocationMessage ) ;
1137
+ if ( irq . Activity is not null )
1138
+ {
1139
+ InjectHeaders ( irq . Activity , invocationMessage ) ;
1140
+ }
1068
1141
1069
1142
Log . RegisteringInvocation ( _logger , irq . InvocationId ) ;
1070
1143
@@ -1085,23 +1158,16 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
1085
1158
}
1086
1159
}
1087
1160
1088
- private static void InjectHeaders ( HubInvocationMessage invocationMessage )
1161
+ private static void InjectHeaders ( Activity currentActivity , HubInvocationMessage invocationMessage )
1089
1162
{
1090
- // TODO: Change when SignalR client has an activity.
1091
- // This sends info about the current activity, regardless of the activity source, to the SignalR server.
1092
- // When SignalR client supports client activities this logic should be updated to only send headers
1093
- // if the SignalR client activity is created. The goal is to match the behavior of distributed tracing in HttpClient.
1094
- if ( Activity . Current is { } currentActivity )
1163
+ DistributedContextPropagator . Current . Inject ( currentActivity , invocationMessage , static ( carrier , key , value ) =>
1095
1164
{
1096
- DistributedContextPropagator . Current . Inject ( currentActivity , invocationMessage , static ( carrier , key , value ) =>
1165
+ if ( carrier is HubInvocationMessage invocationMessage )
1097
1166
{
1098
- if ( carrier is HubInvocationMessage invocationMessage )
1099
- {
1100
- invocationMessage . Headers ??= new Dictionary < string , string > ( ) ;
1101
- invocationMessage . Headers [ key ] = value ;
1102
- }
1103
- } ) ;
1104
- }
1167
+ invocationMessage . Headers ??= new Dictionary < string , string > ( ) ;
1168
+ invocationMessage . Headers [ key ] = value ;
1169
+ }
1170
+ } ) ;
1105
1171
}
1106
1172
1107
1173
private async Task SendHubMessage ( ConnectionState connectionState , HubMessage hubMessage , CancellationToken cancellationToken = default )
@@ -1131,7 +1197,9 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
1131
1197
var readers = default ( Dictionary < string , object > ) ;
1132
1198
1133
1199
CheckDisposed ( ) ;
1134
- var connectionState = await _state . WaitForActiveConnectionAsync ( nameof ( SendCoreAsync ) , token : cancellationToken ) . ConfigureAwait ( false ) ;
1200
+
1201
+ var activity = StartActivity ( methodName ) ;
1202
+ var connectionState = await WaitForActiveConnectionWithActivityAsync ( nameof ( SendCoreAsync ) , activity , token : cancellationToken ) . ConfigureAwait ( false ) ;
1135
1203
try
1136
1204
{
1137
1205
CheckDisposed ( ) ;
@@ -1140,12 +1208,27 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
1140
1208
1141
1209
Log . PreparingNonBlockingInvocation ( _logger , methodName , args . Length ) ;
1142
1210
var invocationMessage = new InvocationMessage ( null , methodName , args , streamIds ? . ToArray ( ) ) ;
1211
+ if ( activity is not null )
1212
+ {
1213
+ InjectHeaders ( activity , invocationMessage ) ;
1214
+ }
1143
1215
await SendHubMessage ( connectionState , invocationMessage , cancellationToken ) . ConfigureAwait ( false ) ;
1144
1216
1145
1217
LaunchStreams ( connectionState , readers , cancellationToken ) ;
1146
1218
}
1219
+ catch ( Exception ex )
1220
+ {
1221
+ if ( activity is not null )
1222
+ {
1223
+ activity . SetStatus ( ActivityStatusCode . Error ) ;
1224
+ activity . SetTag ( "error.type" , ex . GetType ( ) . FullName ) ;
1225
+ activity . Stop ( ) ;
1226
+ }
1227
+ throw ;
1228
+ }
1147
1229
finally
1148
1230
{
1231
+ activity ? . Stop ( ) ;
1149
1232
_state . ReleaseConnectionLock ( ) ;
1150
1233
}
1151
1234
}
0 commit comments