Skip to content

Commit 9783b1e

Browse files
committed
client
1 parent 98b0f14 commit 9783b1e

File tree

4 files changed

+157
-1
lines changed

4 files changed

+157
-1
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,11 @@ private async Task StopAsyncCore(bool disposing)
576576
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
577577
TaskScheduler.Default);
578578
}
579+
580+
if (connectionState.Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
581+
{
582+
feature.DisableReconnect();
583+
}
579584
}
580585
else
581586
{
@@ -1085,6 +1090,12 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
10851090
{
10861091
Log.ReceivedCloseWithError(_logger, close.Error);
10871092
}
1093+
1094+
if (connectionState.Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
1095+
{
1096+
feature.DisableReconnect();
1097+
}
1098+
10881099
return close;
10891100
case PingMessage _:
10901101
Log.ReceivedPing(_logger);

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Net.WebSockets;
88
using System.Threading.Channels;
99
using Microsoft.AspNetCore.Connections;
10+
using Microsoft.AspNetCore.Connections.Abstractions;
1011
using Microsoft.AspNetCore.Connections.Features;
1112
using Microsoft.AspNetCore.Http.Connections.Client;
1213
using Microsoft.AspNetCore.SignalR.Protocol;
@@ -878,6 +879,66 @@ public async Task HubConnectionIsMockable()
878879
mockConnection.Verify(c => c.StopAsync(It.IsAny<CancellationToken>()), Times.Once);
879880
}
880881

882+
[Fact]
883+
public async Task DisableReconnectCalledWhenCloseMessageReceived()
884+
{
885+
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
886+
var innerConnection = new TestConnection();
887+
var reconnectFeature = new TestReconnectFeature();
888+
innerConnection.Features.Set<IReconnectFeature>(reconnectFeature);
889+
890+
var delegateConnectionFactory = new DelegateConnectionFactory(
891+
endPoint => innerConnection.StartAsync());
892+
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
893+
894+
var hubConnection = builder.Build();
895+
var closedEventTcs = new TaskCompletionSource<Exception>();
896+
hubConnection.Closed += e =>
897+
{
898+
closedEventTcs.SetResult(e);
899+
return Task.CompletedTask;
900+
};
901+
902+
await hubConnection.StartAsync().DefaultTimeout();
903+
904+
await innerConnection.ReceiveJsonMessage(new { type = HubProtocolConstants.CloseMessageType });
905+
906+
var exception = await closedEventTcs.Task.DefaultTimeout();
907+
Assert.Null(exception);
908+
909+
await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
910+
}
911+
912+
[Fact]
913+
public async Task DisableReconnectCalledWhenSendingCloseMessage()
914+
{
915+
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
916+
var innerConnection = new TestConnection();
917+
var reconnectFeature = new TestReconnectFeature();
918+
innerConnection.Features.Set<IReconnectFeature>(reconnectFeature);
919+
920+
var delegateConnectionFactory = new DelegateConnectionFactory(
921+
endPoint => innerConnection.StartAsync());
922+
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);
923+
924+
var hubConnection = builder.Build();
925+
var closedEventTcs = new TaskCompletionSource<Exception>();
926+
hubConnection.Closed += e =>
927+
{
928+
closedEventTcs.SetResult(e);
929+
return Task.CompletedTask;
930+
};
931+
932+
await hubConnection.StartAsync().DefaultTimeout();
933+
934+
await hubConnection.StopAsync().DefaultTimeout();
935+
936+
var exception = await closedEventTcs.Task.DefaultTimeout();
937+
Assert.Null(exception);
938+
939+
await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
940+
}
941+
881942
private class SampleObject
882943
{
883944
public SampleObject(string foo, int bar)
@@ -962,4 +1023,18 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
9621023
return HubProtocolExtensions.GetMessageBytes(this, message);
9631024
}
9641025
}
1026+
1027+
private sealed class TestReconnectFeature : IReconnectFeature
1028+
{
1029+
private TaskCompletionSource _disableReconnect = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
1030+
1031+
public Task DisableReconnectCalled => _disableReconnect.Task;
1032+
1033+
public Action<PipeWriter> NotifyOnReconnect { get; set; }
1034+
1035+
public void DisableReconnect()
1036+
{
1037+
_disableReconnect.TrySetResult();
1038+
}
1039+
}
9651040
}

src/SignalR/common/Http.Connections/test/HttpConnectionDispatcherTests.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2414,6 +2414,77 @@ public async Task ReconnectStopsPreviousConnection()
24142414
}
24152415
}
24162416

2417+
[Fact]
2418+
public async Task DisableReconnectDisallowsReplacementConnection()
2419+
{
2420+
using (StartVerifiableLog())
2421+
{
2422+
var manager = CreateConnectionManager(LoggerFactory);
2423+
var options = new HttpConnectionDispatcherOptions() { AllowAcks = true };
2424+
options.WebSockets.CloseTimeout = TimeSpan.FromMilliseconds(1);
2425+
// pretend negotiate occurred
2426+
var connection = manager.CreateConnection(options, negotiateVersion: 1, useAck: true);
2427+
connection.TransportType = HttpTransportType.WebSockets;
2428+
2429+
var dispatcher = CreateDispatcher(manager, LoggerFactory);
2430+
var services = new ServiceCollection();
2431+
2432+
var context = MakeRequest("/foo", connection, services);
2433+
SetTransport(context, HttpTransportType.WebSockets);
2434+
2435+
var builder = new ConnectionBuilder(services.BuildServiceProvider());
2436+
builder.UseConnectionHandler<ReconnectConnectionHandler>();
2437+
var app = builder.Build();
2438+
2439+
var initialWebSocketTask = dispatcher.ExecuteAsync(context, options, app);
2440+
2441+
var reconnectFeature = connection.Features.Get<IReconnectFeature>();
2442+
Assert.NotNull(reconnectFeature);
2443+
2444+
var firstMsg = new byte[] { 1, 4, 8, 9 };
2445+
await connection.Application.Output.WriteAsync(firstMsg);
2446+
2447+
var websocketFeature = (TestWebSocketConnectionFeature)context.Features.Get<IHttpWebSocketFeature>();
2448+
await websocketFeature.Accepted.DefaultTimeout();
2449+
// Run the client socket
2450+
var webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();
2451+
2452+
Assert.Equal(firstMsg, webSocketMessage.Buffer);
2453+
2454+
var called = false;
2455+
var reconnectCallback = reconnectFeature.NotifyOnReconnect;
2456+
reconnectFeature.NotifyOnReconnect = (writer) =>
2457+
{
2458+
called = true;
2459+
reconnectCallback(writer);
2460+
};
2461+
2462+
// Disable will not allow new connection to override existing
2463+
reconnectFeature.DisableReconnect();
2464+
2465+
// New websocket connection with previous connection token
2466+
context = MakeRequest("/foo", connection, services);
2467+
SetTransport(context, HttpTransportType.WebSockets);
2468+
2469+
await dispatcher.ExecuteAsync(context, options, app).DefaultTimeout();
2470+
2471+
Assert.Equal(409, context.Response.StatusCode);
2472+
2473+
Assert.False(called);
2474+
2475+
// Connection still works
2476+
var secondMsg = new byte[] { 7, 6, 3, 2 };
2477+
await connection.Application.Output.WriteAsync(secondMsg);
2478+
2479+
webSocketMessage = await websocketFeature.Client.GetNextMessageAsync().DefaultTimeout();
2480+
Assert.Equal(secondMsg, webSocketMessage.Buffer);
2481+
2482+
connection.Abort();
2483+
2484+
await initialWebSocketTask.DefaultTimeout();
2485+
}
2486+
}
2487+
24172488
private class ControllableMemoryStream : MemoryStream
24182489
{
24192490
private readonly SyncPoint _syncPoint;

src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ public DuplexPipePair(IDuplexPipe transport, IDuplexPipe application)
283283

284284
public static void UpdateConnectionPair(ref DuplexPipePair duplexPipePair, ConnectionContext connection)
285285
{
286-
var prevPipe = duplexPipePair.Application.Input;
287286
var input = new Pipe();
288287

289288
// Add new pipe for reading from and writing to transport from app code

0 commit comments

Comments
 (0)