Skip to content

Commit 40b6542

Browse files
committed
client
1 parent 9c8ee0a commit 40b6542

File tree

4 files changed

+157
-1
lines changed

4 files changed

+157
-1
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,11 @@ private async Task StopAsyncCore(bool disposing)
579579
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
580580
TaskScheduler.Default);
581581
}
582+
583+
if (connectionState.Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
584+
{
585+
feature.DisableReconnect();
586+
}
582587
}
583588
else
584589
{
@@ -1088,6 +1093,12 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
10881093
{
10891094
Log.ReceivedCloseWithError(_logger, close.Error);
10901095
}
1096+
1097+
if (connectionState.Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
1098+
{
1099+
feature.DisableReconnect();
1100+
}
1101+
10911102
return close;
10921103
case PingMessage _:
10931104
Log.ReceivedPing(_logger);

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Net.WebSockets;
88
using System.Threading.Channels;
99
using Microsoft.AspNetCore.Connections;
10+
using Microsoft.AspNetCore.Connections.Abstractions;
1011
using Microsoft.AspNetCore.Connections.Features;
1112
using Microsoft.AspNetCore.Http.Connections.Client;
1213
using Microsoft.AspNetCore.SignalR.Protocol;
@@ -878,6 +879,66 @@ public async Task HubConnectionIsMockable()
878879
mockConnection.Verify(c => c.StopAsync(It.IsAny<CancellationToken>()), Times.Once);
879880
}
880881

882+
[Fact]
883+
public async Task DisableReconnectCalledWhenCloseMessageReceived()
884+
{
885+
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
886+
var innerConnection = new TestConnection();
887+
var reconnectFeature = new TestReconnectFeature();
888+
innerConnection.Features.Set<IReconnectFeature>(reconnectFeature);
889+
890+
var delegateConnectionFactory = new DelegateConnectionFactory(
891+
endPoint => innerConnection.StartAsync());
892+
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
893+
894+
var hubConnection = builder.Build();
895+
var closedEventTcs = new TaskCompletionSource<Exception>();
896+
hubConnection.Closed += e =>
897+
{
898+
closedEventTcs.SetResult(e);
899+
return Task.CompletedTask;
900+
};
901+
902+
await hubConnection.StartAsync().DefaultTimeout();
903+
904+
await innerConnection.ReceiveJsonMessage(new { type = HubProtocolConstants.CloseMessageType });
905+
906+
var exception = await closedEventTcs.Task.DefaultTimeout();
907+
Assert.Null(exception);
908+
909+
await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
910+
}
911+
912+
[Fact]
913+
public async Task DisableReconnectCalledWhenSendingCloseMessage()
914+
{
915+
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
916+
var innerConnection = new TestConnection();
917+
var reconnectFeature = new TestReconnectFeature();
918+
innerConnection.Features.Set<IReconnectFeature>(reconnectFeature);
919+
920+
var delegateConnectionFactory = new DelegateConnectionFactory(
921+
endPoint => innerConnection.StartAsync());
922+
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
923+
924+
var hubConnection = builder.Build();
925+
var closedEventTcs = new TaskCompletionSource<Exception>();
926+
hubConnection.Closed += e =>
927+
{
928+
closedEventTcs.SetResult(e);
929+
return Task.CompletedTask;
930+
};
931+
932+
await hubConnection.StartAsync().DefaultTimeout();
933+
934+
await hubConnection.StopAsync().DefaultTimeout();
935+
936+
var exception = await closedEventTcs.Task.DefaultTimeout();
937+
Assert.Null(exception);
938+
939+
await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
940+
}
941+
881942
private class SampleObject
882943
{
883944
public SampleObject(string foo, int bar)
@@ -962,4 +1023,18 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
9621023
return HubProtocolExtensions.GetMessageBytes(this, message);
9631024
}
9641025
}
1026+
1027+
private sealed class TestReconnectFeature : IReconnectFeature
1028+
{
1029+
private TaskCompletionSource _disableReconnect = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
1030+
1031+
public Task DisableReconnectCalled => _disableReconnect.Task;
1032+
1033+
public Action<PipeWriter> NotifyOnReconnect { get; set; }
1034+
1035+
public void DisableReconnect()
1036+
{
1037+
_disableReconnect.TrySetResult();
1038+
}
1039+
}
9651040
}

src/SignalR/common/Http.Connections/test/HttpConnectionDispatcherTests.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,6 +2415,77 @@ public async Task ReconnectStopsPreviousConnection()
24152415
}
24162416
}
24172417

2418+
[Fact]
2419+
public async Task DisableReconnectDisallowsReplacementConnection()
2420+
{
2421+
using (StartVerifiableLog())
2422+
{
2423+
var manager = CreateConnectionManager(LoggerFactory);
2424+
var options = new HttpConnectionDispatcherOptions() { AllowAcks = true };
2425+
options.WebSockets.CloseTimeout = TimeSpan.FromMilliseconds(1);
2426+
// pretend negotiate occurred
2427+
var connection = manager.CreateConnection(options, negotiateVersion: 1, useAck: true);
2428+
connection.TransportType = HttpTransportType.WebSockets;
2429+
2430+
var dispatcher = CreateDispatcher(manager, LoggerFactory);
2431+
var services = new ServiceCollection();
2432+
2433+
var context = MakeRequest("/foo", connection, services);
2434+
SetTransport(context, HttpTransportType.WebSockets);
2435+
2436+
var builder = new ConnectionBuilder(services.BuildServiceProvider());
2437+
builder.UseConnectionHandler<ReconnectConnectionHandler>();
2438+
var app = builder.Build();
2439+
2440+
var initialWebSocketTask = dispatcher.ExecuteAsync(context, options, app);
2441+
2442+
var reconnectFeature = connection.Features.Get<IReconnectFeature>();
2443+
Assert.NotNull(reconnectFeature);
2444+
2445+
var firstMsg = new byte[] { 1, 4, 8, 9 };
2446+
await connection.Application.Output.WriteAsync(firstMsg);
2447+
2448+
var websocketFeature = (TestWebSocketConnectionFeature)context.Features.Get<IHttpWebSocketFeature>();
2449+
await websocketFeature.Accepted.DefaultTimeout();
2450+
// Run the client socket
2451+
var webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();
2452+
2453+
Assert.Equal(firstMsg, webSocketMessage.Buffer);
2454+
2455+
var called = false;
2456+
var reconnectCallback = reconnectFeature.NotifyOnReconnect;
2457+
reconnectFeature.NotifyOnReconnect = (writer) =>
2458+
{
2459+
called = true;
2460+
reconnectCallback(writer);
2461+
};
2462+
2463+
// Disable will not allow new connection to override existing
2464+
reconnectFeature.DisableReconnect();
2465+
2466+
// New websocket connection with previous connection token
2467+
context = MakeRequest("/foo", connection, services);
2468+
SetTransport(context, HttpTransportType.WebSockets);
2469+
2470+
await dispatcher.ExecuteAsync(context, options, app).DefaultTimeout();
2471+
2472+
Assert.Equal(409, context.Response.StatusCode);
2473+
2474+
Assert.False(called);
2475+
2476+
// Connection still works
2477+
var secondMsg = new byte[] { 7, 6, 3, 2 };
2478+
await connection.Application.Output.WriteAsync(secondMsg);
2479+
2480+
webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();
2481+
Assert.Equal(secondMsg, webSocketMessage.Buffer);
2482+
2483+
connection.Abort();
2484+
2485+
await initialWebSocketTask.DefaultTimeout();
2486+
}
2487+
}
2488+
24182489
private class ControllableMemoryStream : MemoryStream
24192490
{
24202491
private readonly SyncPoint _syncPoint;

src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,6 @@ public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application)
320320

321321
public static void UpdateConnectionPair(ref DuplexPipePair duplexPipePair, ConnectionContext connection)
322322
{
323-
var prevPipe = duplexPipePair.Application.Input;
324323
var input = new Pipe();
325324

326325
// Add new pipe for reading from and writing to transport from app code

0 commit comments

Comments
 (0)