Skip to content

Commit 0fe17bd

Browse files
BrennanConroyAndrew Stanton-Nurse
authored andcommitted
Merged PR 3614: Port 2.1 fixes to 2.2
Placeholder
1 parent facc382 commit 0fe17bd

21 files changed

+965
-109
lines changed

eng/PatchConfig.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ Later on, this will be checked using this condition:
8585
</PropertyGroup>
8686
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.2.9' ">
8787
<PackagesInPatch>
88+
Microsoft.AspNetCore.Http.Connections;
89+
Microsoft.AspNetCore.SignalR.Core;
8890
</PackagesInPatch>
8991
</PropertyGroup>
9092
</Project>

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class HttpConnectionContext : ConnectionContext,
2828
IHttpTransportFeature,
2929
IConnectionInherentKeepAliveFeature
3030
{
31+
private static long _tenSeconds = TimeSpan.FromSeconds(10).Ticks;
32+
3133
private readonly object _itemsLock = new object();
3234
private readonly object _heartbeatLock = new object();
3335
private List<(Action<object> handler, object state)> _heartbeatHandlers;
@@ -36,6 +38,12 @@ public class HttpConnectionContext : ConnectionContext,
3638
private IDuplexPipe _application;
3739
private IDictionary<object, object> _items;
3840

41+
private CancellationTokenSource _sendCts;
42+
private bool _activeSend;
43+
private long _startedSendTime;
44+
private readonly object _sendingLock = new object();
45+
internal CancellationToken SendingToken { get; private set; }
46+
3947
// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
4048
// on the same task
4149
private readonly TaskCompletionSource<object> _disposeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -240,8 +248,26 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
240248
}
241249
else
242250
{
243-
// The other transports don't close their own output, so we can do it here safely
244-
Application?.Output.Complete();
251+
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
252+
// It is safe to wait for this lock now because the Send will be in one of 4 states
253+
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
254+
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
255+
// throw an InvalidOperationException if they call Write
256+
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
257+
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
258+
// 4. No Send in progress
259+
await WriteLock.WaitAsync();
260+
try
261+
{
262+
// Complete the applications read loop
263+
Application?.Output.Complete();
264+
}
265+
finally
266+
{
267+
WriteLock.Release();
268+
}
269+
270+
Application?.Input.CancelPendingRead();
245271
}
246272
}
247273

@@ -309,6 +335,40 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
309335
}
310336
}
311337

338+
internal void StartSendCancellation()
339+
{
340+
lock (_sendingLock)
341+
{
342+
if (_sendCts == null || _sendCts.IsCancellationRequested)
343+
{
344+
_sendCts = new CancellationTokenSource();
345+
SendingToken = _sendCts.Token;
346+
}
347+
_startedSendTime = DateTime.UtcNow.Ticks;
348+
_activeSend = true;
349+
}
350+
}
351+
internal void TryCancelSend(long currentTicks)
352+
{
353+
lock (_sendingLock)
354+
{
355+
if (_activeSend)
356+
{
357+
if (currentTicks - _startedSendTime > _tenSeconds)
358+
{
359+
_sendCts.Cancel();
360+
}
361+
}
362+
}
363+
}
364+
internal void StopSendCancellation()
365+
{
366+
lock (_sendingLock)
367+
{
368+
_activeSend = false;
369+
}
370+
}
371+
312372
private static class Log
313373
{
314374
private static readonly Action<ILogger, string, Exception> _disposingConnection =

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
144144
connection.SupportedFormats = TransferFormat.Text;
145145

146146
// We only need to provide the Input channel since writing to the application is handled through /send.
147-
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
147+
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, connection, _loggerFactory);
148148

149149
await DoPersistentConnection(connectionDelegate, sse, context, connection);
150150
}
@@ -267,7 +267,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
267267
context.Response.RegisterForDispose(timeoutSource);
268268
context.Response.RegisterForDispose(tokenSource);
269269

270-
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
270+
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory, connection);
271271

272272
// Start the transport
273273
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
@@ -294,7 +294,9 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
294294
connection.Transport.Output.Complete(connection.ApplicationTask.Exception);
295295

296296
// Wait for the transport to run
297-
await connection.TransportTask;
297+
// Ignore exceptions, it has been logged if there is one and the application has finished
298+
// So there is no one to give the exception to
299+
await connection.TransportTask.NoThrow();
298300

299301
// If the status code is a 204 it means the connection is done
300302
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
@@ -310,14 +312,15 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
310312
pollAgain = false;
311313
}
312314
}
313-
else if (resultTask.IsFaulted)
315+
else if (resultTask.IsFaulted || resultTask.IsCanceled)
314316
{
315317
// Cancel current request to release any waiting poll and let dispose acquire the lock
316318
currentRequestTcs.TrySetCanceled();
317-
318-
// transport task was faulted, we should remove the connection
319+
// We should be able to safely dispose because there's no more data being written
320+
// We don't need to wait for close here since we've already waited for both sides
319321
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
320322

323+
// Don't poll again if we've removed the connection completely
321324
pollAgain = false;
322325
}
323326
else if (context.Response.StatusCode == StatusCodes.Status204NoContent)
@@ -524,6 +527,14 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti
524527

525528
context.Response.StatusCode = StatusCodes.Status404NotFound;
526529
context.Response.ContentType = "text/plain";
530+
531+
// There are no writes anymore (since this is the write "loop")
532+
// So it is safe to complete the writer
533+
// We complete the writer here because we already have the WriteLock acquired
534+
// and it's unsafe to complete outside of the lock
535+
// Other code isn't guaranteed to be able to acquire the lock before another write
536+
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
537+
connection.Application.Output.Complete();
527538
return;
528539
}
529540
catch (IOException ex)
@@ -571,11 +582,8 @@ private async Task ProcessDeleteAsync(HttpContext context)
571582

572583
Log.TerminatingConection(_logger);
573584

574-
// Complete the receiving end of the pipe
575-
connection.Application.Output.Complete();
576-
577-
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
578-
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
585+
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
586+
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
579587

580588
context.Response.StatusCode = StatusCodes.Status202Accepted;
581589
context.Response.ContentType = "text/plain";

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionManager.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,19 @@ public partial class HttpConnectionManager
3030
private readonly TimerAwaitable _nextHeartbeat;
3131
private readonly ILogger<HttpConnectionManager> _logger;
3232
private readonly ILogger<HttpConnectionContext> _connectionLogger;
33+
private readonly bool _useSendTimeout = true;
3334

3435
public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
3536
{
3637
_logger = loggerFactory.CreateLogger<HttpConnectionManager>();
3738
_connectionLogger = loggerFactory.CreateLogger<HttpConnectionContext>();
3839
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);
3940

41+
if (AppContext.TryGetSwitch("Microsoft.AspNetCore.Http.Connections.DoNotUseSendTimeout", out var timeoutDisabled))
42+
{
43+
_useSendTimeout = !timeoutDisabled;
44+
}
45+
4046
// Register these last as the callbacks could run immediately
4147
appLifetime.ApplicationStarted.Register(() => Start());
4248
appLifetime.ApplicationStopping.Register(() => CloseConnections());
@@ -153,20 +159,26 @@ public async Task ScanAsync()
153159
connection.StateLock.Release();
154160
}
155161

162+
var utcNow = DateTimeOffset.UtcNow;
156163
// Once the decision has been made to dispose we don't check the status again
157164
// But don't clean up connections while the debugger is attached.
158-
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
165+
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (utcNow - lastSeenUtc).TotalSeconds > 5)
159166
{
160167
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
161168
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
162169

163170
// This is most likely a long polling connection. The transport here ends because
164-
// a poll completed and has been inactive for > 5 seconds so we wait for the
171+
// a poll completed and has been inactive for > 5 seconds so we wait for the
165172
// application to finish gracefully
166173
_ = DisposeAndRemoveAsync(connection, closeGracefully: true);
167174
}
168175
else
169176
{
177+
if (!Debugger.IsAttached && _useSendTimeout)
178+
{
179+
connection.TryCancelSend(utcNow.Ticks);
180+
}
181+
170182
// Tick the heartbeat, if the connection is still active
171183
connection.TickHeartbeat();
172184
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
using System.Runtime.CompilerServices;
4+
namespace System.Threading.Tasks
5+
{
6+
internal static class TaskExtensions
7+
{
8+
public static async Task NoThrow(this Task task)
9+
{
10+
await new NoThrowAwaiter(task);
11+
}
12+
}
13+
internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
14+
{
15+
private readonly Task _task;
16+
public NoThrowAwaiter(Task task) { _task = task; }
17+
public NoThrowAwaiter GetAwaiter() => this;
18+
public bool IsCompleted => _task.IsCompleted;
19+
// Observe exception
20+
public void GetResult() { _ = _task.Exception; }
21+
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
22+
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
23+
}
24+
}

src/SignalR/common/Http.Connections/src/Internal/Transports/LongPollingTransport.cs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public class LongPollingTransport : IHttpTransport
1616
private readonly PipeReader _application;
1717
private readonly ILogger _logger;
1818
private readonly CancellationToken _timeoutToken;
19+
private readonly HttpConnectionContext _connection;
1920

2021
public LongPollingTransport(CancellationToken timeoutToken, PipeReader application, ILoggerFactory loggerFactory)
2122
{
@@ -24,14 +25,20 @@ public LongPollingTransport(CancellationToken timeoutToken, PipeReader applicati
2425
_logger = loggerFactory.CreateLogger<LongPollingTransport>();
2526
}
2627

28+
internal LongPollingTransport(CancellationToken timeoutToken, PipeReader application, ILoggerFactory loggerFactory, HttpConnectionContext connection)
29+
: this(timeoutToken, application, loggerFactory)
30+
{
31+
_connection = connection;
32+
}
33+
2734
public async Task ProcessRequestAsync(HttpContext context, CancellationToken token)
2835
{
2936
try
3037
{
3138
var result = await _application.ReadAsync(token);
3239
var buffer = result.Buffer;
3340

34-
if (buffer.IsEmpty && result.IsCompleted)
41+
if (buffer.IsEmpty && (result.IsCompleted || result.IsCanceled))
3542
{
3643
Log.LongPolling204(_logger);
3744
context.Response.ContentType = "text/plain";
@@ -49,19 +56,22 @@ public async Task ProcessRequestAsync(HttpContext context, CancellationToken tok
4956

5057
try
5158
{
52-
await context.Response.Body.WriteAsync(buffer);
59+
_connection?.StartSendCancellation();
60+
await context.Response.Body.WriteAsync(buffer, _connection?.SendingToken ?? default);
5361
}
5462
finally
5563
{
64+
_connection?.StopSendCancellation();
5665
_application.AdvanceTo(buffer.End);
5766
}
5867
}
5968
catch (OperationCanceledException)
6069
{
61-
// 3 cases:
70+
// 4 cases:
6271
// 1 - Request aborted, the client disconnected (no response)
6372
// 2 - The poll timeout is hit (204)
64-
// 3 - A new request comes in and cancels this request (204)
73+
// 3 - SendingToken was canceled, abort the connection
74+
// 4 - A new request comes in and cancels this request (204)
6575

6676
// Case 1
6777
if (context.RequestAborted.IsCancellationRequested)
@@ -79,9 +89,16 @@ public async Task ProcessRequestAsync(HttpContext context, CancellationToken tok
7989
context.Response.ContentType = "text/plain";
8090
context.Response.StatusCode = StatusCodes.Status200OK;
8191
}
82-
else
92+
else if (_connection?.SendingToken.IsCancellationRequested == true)
8393
{
8494
// Case 3
95+
context.Response.ContentType = "text/plain";
96+
context.Response.StatusCode = StatusCodes.Status204NoContent;
97+
throw;
98+
}
99+
else
100+
{
101+
// Case 4
85102
Log.LongPolling204(_logger);
86103
context.Response.ContentType = "text/plain";
87104
context.Response.StatusCode = StatusCodes.Status204NoContent;

src/SignalR/common/Http.Connections/src/Internal/Transports/ServerSentEventsMessageFormatter.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Buffers;
66
using System.IO;
7+
using System.Threading;
78
using System.Threading.Tasks;
89

910
namespace Microsoft.AspNetCore.Http.Connections.Internal
@@ -15,19 +16,24 @@ public static class ServerSentEventsMessageFormatter
1516

1617
private const byte LineFeed = (byte)'\n';
1718

18-
public static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output)
19+
public static Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output)
20+
{
21+
return WriteMessageAsync(payload, output, default);
22+
}
23+
24+
internal static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output, CancellationToken token)
1925
{
2026
// Payload does not contain a line feed so write it directly to output
2127
if (payload.PositionOf(LineFeed) == null)
2228
{
2329
if (payload.Length > 0)
2430
{
25-
await output.WriteAsync(DataPrefix, 0, DataPrefix.Length);
26-
await output.WriteAsync(payload);
27-
await output.WriteAsync(Newline, 0, Newline.Length);
31+
await output.WriteAsync(DataPrefix, 0, DataPrefix.Length, token);
32+
await output.WriteAsync(payload, token);
33+
await output.WriteAsync(Newline, 0, Newline.Length, token);
2834
}
2935

30-
await output.WriteAsync(Newline, 0, Newline.Length);
36+
await output.WriteAsync(Newline, 0, Newline.Length, token);
3137
return;
3238
}
3339

@@ -37,7 +43,7 @@ public static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Strea
3743
await WriteMessageToMemory(ms, payload);
3844
ms.Position = 0;
3945

40-
await ms.CopyToAsync(output);
46+
await ms.CopyToAsync(output, bufferSize: 81920, token);
4147
}
4248

4349
/// <summary>

0 commit comments

Comments
 (0)