Skip to content

Commit 027821f

Browse files
committed
fb
1 parent c0fd980 commit 027821f

File tree

9 files changed

+93
-56
lines changed

9 files changed

+93
-56
lines changed

src/Servers/Connections.Abstractions/src/Features/IStatefulReconnectFeature.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace Microsoft.AspNetCore.Connections.Abstractions;
1818
/// </summary>
1919
/// <remarks>This feature is experimental.</remarks>
2020
#if NET8_0_OR_GREATER
21-
[RequiresPreviewFeatures("IReconnectFeature is a preview interface")]
21+
[RequiresPreviewFeatures("IStatefulReconnectFeature is a preview interface")]
2222
#endif
2323
public interface IStatefulReconnectFeature
2424
{

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1922,7 +1922,7 @@ public ConnectionState(ConnectionContext connection, HubConnection hubConnection
19221922
_hubConnection._serviceProvider.GetService<IOptions<HubConnectionOptions>>()?.Value.StatefulReconnectBufferSize
19231923
?? DefaultStatefulReconnectBufferSize);
19241924

1925-
feature.OnReconnected(_messageBuffer.Resend);
1925+
feature.OnReconnected(_messageBuffer.ResendAsync);
19261926
}
19271927
#pragma warning restore CA2252 // This API requires opting into preview features
19281928
}

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/WebSocketsTransport.cs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public void OnReconnected(Func<PipeWriter, Task> notifyOnReconnect)
6969
};
7070
}
7171
}
72-
//public Action<PipeWriter> NotifyOnReconnect { get => _notifyOnReconnect is not null ? _notifyOnReconnect : (_) => { }; set => _notifyOnReconnect = value; }
7372
#pragma warning restore CA2252 // This API requires opting into preview features
7473

7574
public WebSocketsTransport(HttpConnectionOptions httpConnectionOptions, ILoggerFactory loggerFactory, Func<Task<string?>> accessTokenProvider, HttpClient? httpClient,
@@ -396,7 +395,11 @@ private async Task ProcessSocketAsync(WebSocket socket, Uri url, bool isReconnec
396395

397396
if (_useAck && !_gracefulClose)
398397
{
399-
UpdateConnectionPair();
398+
if (!UpdateConnectionPair())
399+
{
400+
return;
401+
}
402+
400403
await StartAsync(url, _webSocketMessageType == WebSocketMessageType.Binary ? TransferFormat.Binary : TransferFormat.Text,
401404
cancellationToken: default).ConfigureAwait(false);
402405
}
@@ -663,22 +666,39 @@ public async Task StopAsync()
663666
Log.TransportStopped(_logger, null);
664667
}
665668

666-
private void UpdateConnectionPair()
669+
private bool UpdateConnectionPair()
667670
{
668-
var input = new Pipe(_httpConnectionOptions.TransportPipeOptions);
671+
lock (this)
672+
{
673+
// Lock and check _useAck, we want to swap the Pipe completely before DisableReconnect returns if there is contention there.
674+
// The calling code will start completing the transport after DisableReconnect
675+
// so we want to avoid any possibility of the new Pipe staying alive or even worse a new WebSocket connection being open when the transport
676+
// might think it's closed.
677+
if (_useAck == false)
678+
{
679+
return false;
680+
}
669681

670-
// Add new pipe for reading from and writing to transport from app code
671-
var transportToApplication = new DuplexPipe(_transport!.Input, input.Writer);
672-
var applicationToTransport = new DuplexPipe(input.Reader, _application!.Output);
682+
var input = new Pipe(_httpConnectionOptions.TransportPipeOptions);
673683

674-
_application = applicationToTransport;
675-
_transport = transportToApplication;
684+
// Add new pipe for reading from and writing to transport from app code
685+
var transportToApplication = new DuplexPipe(_transport!.Input, input.Writer);
686+
var applicationToTransport = new DuplexPipe(input.Reader, _application!.Output);
687+
688+
_application = applicationToTransport;
689+
_transport = transportToApplication;
690+
}
691+
692+
return true;
676693
}
677694

678695
#pragma warning disable CA2252 // This API requires opting into preview features
679696
public void DisableReconnect()
680697
#pragma warning restore CA2252 // This API requires opting into preview features
681698
{
682-
_useAck = false;
699+
lock (this)
700+
{
701+
_useAck = false;
702+
}
683703
}
684704
}

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ internal sealed partial class HttpConnectionContext : ConnectionContext,
5151
private CancellationTokenSource? _sendCts;
5252
private bool _activeSend;
5353
private long _startedSendTime;
54-
private bool _useAcks;
54+
private bool _useStatefulReconnect;
5555
private readonly object _sendingLock = new object();
5656
internal CancellationToken SendingToken { get; private set; }
5757

@@ -66,7 +66,7 @@ internal sealed partial class HttpConnectionContext : ConnectionContext,
6666
/// The caller is expected to set the <see cref="Transport"/> and <see cref="Application"/> pipes manually.
6767
/// </summary>
6868
public HttpConnectionContext(string connectionId, string connectionToken, ILogger logger, MetricsContext metricsContext,
69-
IDuplexPipe transport, IDuplexPipe application, HttpConnectionDispatcherOptions options, bool useAcks)
69+
IDuplexPipe transport, IDuplexPipe application, HttpConnectionDispatcherOptions options, bool useStatefulReconnect)
7070
{
7171
Transport = transport;
7272
_applicationStream = new PipeWriterStream(application.Output);
@@ -98,7 +98,7 @@ public HttpConnectionContext(string connectionId, string connectionToken, ILogge
9898
Features.Set<IConnectionLifetimeFeature>(this);
9999
Features.Set<IConnectionLifetimeNotificationFeature>(this);
100100

101-
if (useAcks)
101+
if (useStatefulReconnect)
102102
{
103103
#pragma warning disable CA2252 // This API requires opting into preview features
104104
Features.Set<IStatefulReconnectFeature>(this);
@@ -111,10 +111,10 @@ public HttpConnectionContext(string connectionId, string connectionToken, ILogge
111111
_connectionCloseRequested = new CancellationTokenSource();
112112
ConnectionClosedRequested = _connectionCloseRequested.Token;
113113
AuthenticationExpiration = DateTimeOffset.MaxValue;
114-
_useAcks = useAcks;
114+
_useStatefulReconnect = useStatefulReconnect;
115115
}
116116

117-
public bool UseAcks => _useAcks;
117+
public bool UseStatefulReconnect => _useStatefulReconnect;
118118

119119
public CancellationTokenSource? Cancellation { get; set; }
120120

@@ -210,9 +210,6 @@ public IDuplexPipe Application
210210

211211
public CancellationToken ConnectionClosedRequested { get; set; }
212212

213-
#pragma warning disable CA2252 // This API requires opting into preview features
214-
#pragma warning restore CA2252 // This API requires opting into preview features
215-
216213
public override void Abort()
217214
{
218215
ThreadPool.UnsafeQueueUserWorkItem(cts => ((CancellationTokenSource)cts!).Cancel(), _connectionClosedTokenSource);
@@ -554,12 +551,17 @@ internal async Task<bool> CancelPreviousPoll(HttpContext context)
554551
// Cancel the previous request
555552
cts?.Cancel();
556553

557-
// TODO: remove transport check once other transports support acks
558-
if (UseAcks && TransportType == HttpTransportType.WebSockets)
554+
// TODO: remove transport check once other transports support Stateful Reconnect
555+
if (UseStatefulReconnect && TransportType == HttpTransportType.WebSockets)
559556
{
560557
// Break transport send loop in case it's still waiting on reading from the application
561558
Application.Input.CancelPendingRead();
562-
UpdateConnectionPair();
559+
if (!UpdateConnectionPair())
560+
{
561+
context.Response.ContentType = "text/plain";
562+
context.Response.StatusCode = StatusCodes.Status204NoContent;
563+
return false;
564+
}
563565
}
564566

565567
try
@@ -662,23 +664,39 @@ public void RequestClose()
662664
ThreadPool.UnsafeQueueUserWorkItem(static cts => ((CancellationTokenSource)cts!).Cancel(), _connectionCloseRequested);
663665
}
664666

665-
private void UpdateConnectionPair()
667+
private bool UpdateConnectionPair()
666668
{
667-
var input = new Pipe(_options.TransportPipeOptions);
669+
lock (_stateLock)
670+
{
671+
// Lock and check _useStatefulReconnect, we want to swap the Pipe completely before DisableReconnect returns if there is contention there.
672+
// The calling code will start completing the transport after DisableReconnect
673+
// so we want to avoid any possibility of the new Pipe staying alive or even worse a new WebSocket connection being open when the transport
674+
// might think it's closed.
675+
if (!_useStatefulReconnect)
676+
{
677+
return false;
678+
}
679+
var input = new Pipe(_options.TransportPipeOptions);
668680

669-
// Add new pipe for reading from and writing to transport from app code
670-
var transportToApplication = new DuplexPipe(Transport.Input, input.Writer);
671-
var applicationToTransport = new DuplexPipe(input.Reader, Application.Output);
681+
// Add new pipe for reading from and writing to transport from app code
682+
var transportToApplication = new DuplexPipe(Transport.Input, input.Writer);
683+
var applicationToTransport = new DuplexPipe(input.Reader, Application.Output);
684+
685+
Application = applicationToTransport;
686+
Transport = transportToApplication;
687+
}
672688

673-
Application = applicationToTransport;
674-
Transport = transportToApplication;
689+
return true;
675690
}
676691

677692
#pragma warning disable CA2252 // This API requires opting into preview features
678693
public void DisableReconnect()
679694
#pragma warning restore CA2252 // This API requires opting into preview features
680695
{
681-
_useAcks = false;
696+
lock (_stateLock)
697+
{
698+
_useStatefulReconnect = false;
699+
}
682700
}
683701

684702
#pragma warning disable CA2252 // This API requires opting into preview features

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
189189
return;
190190
}
191191

192-
if (connection.TransportType != HttpTransportType.WebSockets || connection.UseAcks)
192+
if (connection.TransportType != HttpTransportType.WebSockets || connection.UseStatefulReconnect)
193193
{
194194
if (!await connection.CancelPreviousPoll(context))
195195
{
@@ -201,6 +201,8 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
201201
// Create a new Tcs every poll to keep track of the poll finishing, so we can properly wait on previous polls
202202
var currentRequestTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
203203

204+
var reconnectTask = Task.CompletedTask;
205+
204206
switch (transport)
205207
{
206208
case HttpTransportType.None:
@@ -213,20 +215,10 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
213215
return;
214216
}
215217

216-
if (connection.UseAcks && isReconnect)
218+
if (connection.UseStatefulReconnect && isReconnect)
217219
{
218220
// Should call this after the transport has started, otherwise we'll be writing to a Pipe that isn't being read from
219-
try
220-
{
221-
var reconnectTask = connection.NotifyOnReconnect?.Invoke(connection.Transport.Output) ?? Task.CompletedTask;
222-
await reconnectTask;
223-
}
224-
catch (Exception ex)
225-
{
226-
// MessageBuffer shouldn't throw from the callback
227-
// But users can technically add a callback, we don't want to trust them not to throw
228-
Log.NotifyOnReconnectError(_logger, ex);
229-
}
221+
reconnectTask = connection.NotifyOnReconnect?.Invoke(connection.Transport.Output) ?? Task.CompletedTask;
230222
}
231223
break;
232224
case HttpTransportType.LongPolling:
@@ -242,6 +234,18 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
242234
}
243235

244236
context.Features.Get<IHttpRequestTimeoutFeature>()?.DisableTimeout();
237+
238+
try
239+
{
240+
await reconnectTask;
241+
}
242+
catch (Exception ex)
243+
{
244+
// MessageBuffer shouldn't throw from the callback
245+
// But users can technically add a callback, we don't want to trust them not to throw
246+
Log.NotifyOnReconnectError(_logger, ex);
247+
}
248+
245249
var resultTask = await Task.WhenAny(connection.ApplicationTask!, connection.TransportTask!);
246250

247251
try

src/SignalR/common/Http.Connections/test/WebSocketsTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public async Task WebSocketTransportSetsMessageTypeBasedOnTransferFormatFeature(
111111
private HttpConnectionContext CreateHttpConnectionContext(DuplexPipe.DuplexPipePair pair, string loggerName = null)
112112
{
113113
return new HttpConnectionContext("foo", connectionToken: null, LoggerFactory.CreateLogger(loggerName ?? nameof(HttpConnectionContext)),
114-
metricsContext: default, pair.Transport, pair.Application, new(), useAcks: false);
114+
metricsContext: default, pair.Transport, pair.Application, new(), useStatefulReconnect: false);
115115
}
116116

117117
[Fact]

src/SignalR/common/Shared/MessageBuffer.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,18 +226,13 @@ internal void ResetSequence(SequenceMessage sequenceMessage)
226226
_currentReceivingSequenceId = sequenceMessage.SequenceId;
227227
}
228228

229-
internal Task Resend(PipeWriter writer)
229+
internal async Task ResendAsync(PipeWriter writer)
230230
{
231231
_waitForSequenceMessage = true;
232232

233233
var tcs = new TaskCompletionSource<FlushResult>(TaskCreationOptions.RunContinuationsAsynchronously);
234234
_resend = tcs;
235235

236-
return DoResendAsync(tcs, writer);
237-
}
238-
239-
private async Task DoResendAsync(TaskCompletionSource<FlushResult> tcs, PipeWriter writer)
240-
{
241236
FlushResult finalResult = new();
242237
await _writeLock.WaitAsync().ConfigureAwait(false);
243238
try

src/SignalR/server/Core/src/HubConnectionContext.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ await WriteHandshakeResponseAsync(new HandshakeResponseMessage(
588588
{
589589
_useAcks = true;
590590
_messageBuffer = new MessageBuffer(_connectionContext, Protocol, _statefulReconnectBufferSize);
591-
feature.OnReconnected(_messageBuffer.Resend);
591+
feature.OnReconnected(_messageBuffer.ResendAsync);
592592
}
593593
#pragma warning restore CA2252 // This API requires opting into preview features
594594
return true;

src/SignalR/server/SignalR/test/Internal/MessageBufferTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public async Task UnAckedMessageResentOnReconnect()
9797
pipes.Application.Input.AdvanceTo(buffer.Start);
9898

9999
DuplexPipe.UpdateConnectionPair(ref pipes, connection);
100-
await messageBuffer.Resend(pipes.Transport.Output);
100+
await messageBuffer.ResendAsync(pipes.Transport.Output);
101101

102102
// Any message except SequenceMessage will be ignored until a SequenceMessage is received
103103
Assert.False(messageBuffer.ShouldProcessMessage(PingMessage.Instance));
@@ -149,7 +149,7 @@ public async Task AckedMessageNotResentOnReconnect()
149149
messageBuffer.Ack(new AckMessage(1));
150150

151151
DuplexPipe.UpdateConnectionPair(ref pipes, connection);
152-
await messageBuffer.Resend(pipes.Transport.Output);
152+
await messageBuffer.ResendAsync(pipes.Transport.Output);
153153

154154
res = await pipes.Application.Input.ReadAsync();
155155

@@ -181,7 +181,7 @@ public async Task ReceiveSequenceMessageWithLargerIDThanMessagesReceived()
181181
using var messageBuffer = new MessageBuffer(connection, protocol, bufferLimit: 1000);
182182

183183
DuplexPipe.UpdateConnectionPair(ref pipes, connection);
184-
await messageBuffer.Resend(pipes.Transport.Output);
184+
await messageBuffer.ResendAsync(pipes.Transport.Output);
185185

186186
var res = await pipes.Application.Input.ReadAsync();
187187

@@ -213,7 +213,7 @@ public async Task WriteManyMessagesAckSomeProperlyBuffers()
213213
messageBuffer.Ack(new AckMessage(ackNum));
214214

215215
DuplexPipe.UpdateConnectionPair(ref pipes, connection);
216-
await messageBuffer.Resend(pipes.Transport.Output);
216+
await messageBuffer.ResendAsync(pipes.Transport.Output);
217217

218218
var res = await pipes.Application.Input.ReadAsync();
219219

0 commit comments

Comments
 (0)