Skip to content

Commit a698667

Browse files
committed
Handle negotiate changing the connection URL
1 parent cf474bf commit a698667

File tree

6 files changed

+162
-20
lines changed

6 files changed

+162
-20
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -729,8 +729,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
729729

730730
CheckDisposed();
731731

732-
var activity = StartActivity(methodName);
733-
var connectionState = await WaitForActiveConnectionWithActivityAsync(nameof(StreamAsChannelCoreAsync), activity, token: cancellationToken).ConfigureAwait(false);
732+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(StreamAsChannelCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
734733

735734
ChannelReader<object?> channel;
736735
try
@@ -1013,11 +1012,40 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
10131012
}
10141013
}
10151014

1016-
private async Task<ConnectionState> WaitForActiveConnectionWithActivityAsync(string methodName, Activity? activity, CancellationToken token)
1015+
private async Task<(ConnectionState, Activity?)> WaitForActiveConnectionWithActivityAsync(string sendingMethodName, string invokedMethodName, CancellationToken token)
10171016
{
1017+
// Start the activity before waiting on the connection.
1018+
// Starting the activity here means time to connect or reconnect is included in the invoke.
1019+
var activity = CreateActivity(invokedMethodName);
1020+
10181021
try
10191022
{
1020-
return await _state.WaitForActiveConnectionAsync(methodName, token).ConfigureAwait(false);
1023+
ConnectionState connectionState;
1024+
var connectionStateTask = _state.WaitForActiveConnectionAsync(sendingMethodName, token);
1025+
if (connectionStateTask.Status == TaskStatus.RanToCompletion)
1026+
{
1027+
// Attempt to get already connected connection and set server tags using it.
1028+
connectionState = connectionStateTask.Result;
1029+
SetServerTags(activity, connectionState.ConnectionUrl);
1030+
activity?.Start();
1031+
}
1032+
else
1033+
{
1034+
// Fallback to using configured endpoint.
1035+
var initialUri = (_endPoint as UriEndPoint)?.Uri;
1036+
SetServerTags(activity, initialUri);
1037+
activity?.Start();
1038+
1039+
connectionState = await connectionStateTask.ConfigureAwait(false);
1040+
1041+
// After connection is returned, check if URL is different. If so, update activity server tags.
1042+
if (connectionState.ConnectionUrl != null && connectionState.ConnectionUrl != initialUri)
1043+
{
1044+
SetServerTags(activity, connectionState.ConnectionUrl);
1045+
}
1046+
}
1047+
1048+
return (connectionState, activity);
10211049
}
10221050
catch (Exception ex)
10231051
{
@@ -1031,6 +1059,15 @@ private async Task<ConnectionState> WaitForActiveConnectionWithActivityAsync(str
10311059

10321060
throw;
10331061
}
1062+
1063+
static void SetServerTags(Activity? activity, Uri? uri)
1064+
{
1065+
if (activity != null && uri != null)
1066+
{
1067+
activity.SetTag("server.address", uri.Host);
1068+
activity.SetTag("server.port", uri.Port);
1069+
}
1070+
}
10341071
}
10351072

10361073
private async Task<object?> InvokeCoreAsyncCore(string methodName, Type returnType, object?[] args, CancellationToken cancellationToken)
@@ -1039,8 +1076,7 @@ private async Task<ConnectionState> WaitForActiveConnectionWithActivityAsync(str
10391076

10401077
CheckDisposed();
10411078

1042-
var activity = StartActivity(methodName);
1043-
var connectionState = await WaitForActiveConnectionWithActivityAsync(nameof(InvokeCoreAsync), activity, token: cancellationToken).ConfigureAwait(false);
1079+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(InvokeCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
10441080

10451081
Task<object?> invocationTask;
10461082
try
@@ -1063,7 +1099,7 @@ private async Task<ConnectionState> WaitForActiveConnectionWithActivityAsync(str
10631099
return await invocationTask.ConfigureAwait(false);
10641100
}
10651101

1066-
private Activity? StartActivity(string methodName)
1102+
private Activity? CreateActivity(string methodName)
10671103
{
10681104
var activity = _activitySource.CreateActivity(ActivityName, ActivityKind.Client);
10691105
if (activity is null && Activity.Current is not null && _logger.IsEnabled(LogLevel.Critical))
@@ -1085,14 +1121,6 @@ private async Task<ConnectionState> WaitForActiveConnectionWithActivityAsync(str
10851121

10861122
activity.SetTag("rpc.system", "signalr");
10871123
activity.SetTag("rpc.method", methodName);
1088-
1089-
if (_endPoint is UriEndPoint e)
1090-
{
1091-
activity.SetTag("server.address", e.Uri.Host);
1092-
activity.SetTag("server.port", e.Uri.Port);
1093-
}
1094-
1095-
activity.Start();
10961124
}
10971125

10981126
return activity;
@@ -1198,8 +1226,7 @@ private async Task SendCoreAsyncCore(string methodName, object?[] args, Cancella
11981226

11991227
CheckDisposed();
12001228

1201-
var activity = StartActivity(methodName);
1202-
var connectionState = await WaitForActiveConnectionWithActivityAsync(nameof(SendCoreAsync), activity, token: cancellationToken).ConfigureAwait(false);
1229+
var (connectionState, activity) = await WaitForActiveConnectionWithActivityAsync(nameof(SendCoreAsync), methodName, token: cancellationToken).ConfigureAwait(false);
12031230
try
12041231
{
12051232
CheckDisposed();
@@ -2101,6 +2128,7 @@ private sealed class ConnectionState : IInvocationBinder
21012128
private long _nextActivationSendPing;
21022129

21032130
public ConnectionContext Connection { get; }
2131+
public Uri? ConnectionUrl { get; }
21042132
public Task? ReceiveTask { get; set; }
21052133
public Exception? CloseException { get; set; }
21062134
public CancellationToken UploadStreamToken { get; set; }
@@ -2119,6 +2147,7 @@ public bool Stopping
21192147
public ConnectionState(ConnectionContext connection, HubConnection hubConnection)
21202148
{
21212149
Connection = connection;
2150+
ConnectionUrl = (connection.RemoteEndPoint is UriEndPoint ep) ? ep.Uri : null;
21222151

21232152
_hubConnection = hubConnection;
21242153
_hubConnection._logScope.ConnectionId = connection.ConnectionId;

src/SignalR/clients/csharp/Client/test/UnitTests/HttpConnectionTests.Negotiate.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,13 +456,15 @@ public async Task NegotiateThatReturnsRedirectUrlDoesNotAddAnotherNegotiateVersi
456456

457457
testHttpHandler.OnLongPollDelete((token) => ResponseUtils.CreateResponse(HttpStatusCode.Accepted));
458458

459+
EndPoint connectedEndpoint = null;
459460
using (var noErrorScope = new VerifyNoErrorsScope())
460461
{
461462
await WithConnectionAsync(
462463
CreateConnection(testHttpHandler, loggerFactory: noErrorScope.LoggerFactory),
463464
async (connection) =>
464465
{
465466
await connection.StartAsync().DefaultTimeout();
467+
connectedEndpoint = connection.RemoteEndPoint;
466468
});
467469
}
468470

@@ -471,6 +473,7 @@ await WithConnectionAsync(
471473
Assert.Equal("https://another.domain.url/chat?negotiateVersion=1&id=0rge0d00-0040-0030-0r00-000q00r00e00", testHttpHandler.ReceivedRequests[2].RequestUri.ToString());
472474
Assert.Equal("https://another.domain.url/chat?negotiateVersion=1&id=0rge0d00-0040-0030-0r00-000q00r00e00", testHttpHandler.ReceivedRequests[3].RequestUri.ToString());
473475
Assert.Equal(5, testHttpHandler.ReceivedRequests.Count);
476+
Assert.Equal("https://another.domain.url/chat", connectedEndpoint.ToString());
474477
}
475478

476479
[Fact]

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Helpers.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ private static HubConnection CreateHubConnection(
2121
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
2222

2323
var delegateConnectionFactory = new DelegateConnectionFactory(
24-
endPoint => connection.StartAsync());
24+
async endPoint =>
25+
{
26+
connection.RemoteEndPoint = endPoint;
27+
return await connection.StartAsync();
28+
});
2529

2630
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
2731

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Tracing.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using System;
55
using System.Diagnostics;
66
using System.Threading.Tasks;
7+
using Microsoft.AspNetCore.Connections;
8+
using Microsoft.AspNetCore.Internal;
79
using Microsoft.AspNetCore.InternalTesting;
810
using Microsoft.AspNetCore.SignalR.Client.Internal;
911
using Microsoft.AspNetCore.SignalR.Tests;
@@ -42,6 +44,7 @@ public async Task InvokeSendsAnInvocationMessage_SendTraceHeaders()
4244
var traceParent = (string)invokeMessage["headers"]["traceparent"];
4345

4446
Assert.Equal(clientActivity.Id, traceParent);
47+
Assert.Equal("example.com", clientActivity.TagObjects.Single(t => t.Key == "server.address").Value);
4548

4649
Assert.Equal(TaskStatus.WaitingForActivation, invokeTask.Status);
4750
}
@@ -124,5 +127,91 @@ public async Task SendAnInvocationMessage_SendTraceHeaders()
124127
}
125128
}
126129

130+
[Fact]
131+
public async Task InvokeSendsAnInvocationMessage_ConnectionRemoteEndPointChanged_UseRemoteEndpointUrl()
132+
{
133+
var clientSourceContainer = new SignalRClientActivitySource();
134+
Activity clientActivity = null;
135+
136+
using var listener = new ActivityListener
137+
{
138+
ShouldListenTo = activitySource => ReferenceEquals(activitySource, clientSourceContainer.ActivitySource),
139+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
140+
ActivityStarted = activity => clientActivity = activity
141+
};
142+
ActivitySource.AddActivityListener(listener);
143+
144+
TestConnection connection = null;
145+
connection = new TestConnection(onStart: () =>
146+
{
147+
connection.RemoteEndPoint = new UriEndPoint(new Uri("http://example.net"));
148+
return Task.CompletedTask;
149+
});
150+
var hubConnection = CreateHubConnection(connection, clientActivitySource: clientSourceContainer);
151+
try
152+
{
153+
await hubConnection.StartAsync().DefaultTimeout();
154+
155+
_ = hubConnection.InvokeAsync("Foo");
156+
157+
await connection.ReadSentJsonAsync().DefaultTimeout();
158+
159+
Assert.Equal("example.net", clientActivity.TagObjects.Single(t => t.Key == "server.address").Value);
160+
}
161+
finally
162+
{
163+
await hubConnection.DisposeAsync().DefaultTimeout();
164+
await connection.DisposeAsync().DefaultTimeout();
165+
}
166+
}
167+
168+
[Fact]
169+
public async Task InvokeSendsAnInvocationMessage_ConnectionRemoteEndPointChangedDuringConnect_UseRemoteEndpointUrl()
170+
{
171+
var clientSourceContainer = new SignalRClientActivitySource();
172+
var clientActivityTcs = new TaskCompletionSource<Activity>(TaskCreationOptions.RunContinuationsAsynchronously); ;
173+
174+
using var listener = new ActivityListener
175+
{
176+
ShouldListenTo = activitySource => ReferenceEquals(activitySource, clientSourceContainer.ActivitySource),
177+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
178+
ActivityStarted = clientActivityTcs.SetResult
179+
};
180+
ActivitySource.AddActivityListener(listener);
181+
182+
var syncPoint = new SyncPoint();
183+
TestConnection connection = null;
184+
connection = new TestConnection(onStart: async () =>
185+
{
186+
await syncPoint.WaitToContinue();
187+
connection.RemoteEndPoint = new UriEndPoint(new Uri("http://example.net"));
188+
});
189+
var hubConnection = CreateHubConnection(connection, clientActivitySource: clientSourceContainer);
190+
try
191+
{
192+
var startTask = hubConnection.StartAsync();
193+
194+
_ = hubConnection.InvokeAsync("Foo");
195+
196+
var clientActivity = await clientActivityTcs.Task.DefaultTimeout();
197+
198+
// Initial server.address uses configured HubConnection URL.
199+
Assert.Equal("example.com", clientActivity.TagObjects.Single(t => t.Key == "server.address").Value);
200+
201+
syncPoint.Continue();
202+
203+
await startTask.DefaultTimeout();
204+
205+
await connection.ReadSentJsonAsync().DefaultTimeout();
206+
207+
// After connection is started, server.address is updated to the connection's remote endpoint.
208+
Assert.Equal("example.net", clientActivity.TagObjects.Single(t => t.Key == "server.address").Value);
209+
}
210+
finally
211+
{
212+
await hubConnection.DisposeAsync().DefaultTimeout();
213+
await connection.DisposeAsync().DefaultTimeout();
214+
}
215+
}
127216
}
128217
}

src/SignalR/clients/csharp/Http.Connections.Client/src/HttpConnection.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ public HttpConnection(HttpConnectionOptions httpConnectionOptions, ILoggerFactor
148148

149149
_url = _httpConnectionOptions.Url;
150150

151-
RemoteEndPoint = new UriEndPoint(_url);
152-
153151
if (!httpConnectionOptions.SkipNegotiation || httpConnectionOptions.Transports != HttpTransportType.WebSockets)
154152
{
155153
_httpClient = CreateHttpClient();
@@ -353,6 +351,9 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat, Cancel
353351
throw new InvalidOperationException("Negotiate redirection limit exceeded.");
354352
}
355353

354+
// Set the final negotiated URI as the endpoint.
355+
RemoteEndPoint = new UriEndPoint(Utils.CreateEndPointUri(uri));
356+
356357
// This should only need to happen once
357358
var connectUrl = CreateConnectUrl(uri, negotiationResponse.ConnectionToken);
358359

@@ -405,6 +406,9 @@ private async Task SelectAndStartTransport(TransferFormat transferFormat, Cancel
405406
_httpConnectionOptions.UseStatefulReconnect = transportType == HttpTransportType.WebSockets ? _httpConnectionOptions.UseStatefulReconnect : false;
406407
negotiationResponse = await GetNegotiationResponseAsync(uri, cancellationToken).ConfigureAwait(false);
407408
connectUrl = CreateConnectUrl(uri, negotiationResponse.ConnectionToken);
409+
410+
// Set the final negotiated URI as the endpoint.
411+
RemoteEndPoint = new UriEndPoint(Utils.CreateEndPointUri(uri));
408412
}
409413

410414
Log.StartingTransport(_logger, transportType, uri);

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/Utils.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal;
77

88
internal static class Utils
99
{
10+
public static Uri CreateEndPointUri(Uri url)
11+
{
12+
// The EndPoint URI shouldn't have querystring or target.
13+
var uriBuilder = new UriBuilder
14+
{
15+
Scheme = url.Scheme,
16+
Host = url.Host,
17+
Port = url.Port,
18+
Path = url.AbsolutePath
19+
};
20+
return uriBuilder.Uri;
21+
}
22+
1023
public static Uri AppendPath(Uri url, string path)
1124
{
1225
var builder = new UriBuilder(url);

0 commit comments

Comments
 (0)