Skip to content

Commit 0757955

Browse files
BrennanConroyAndrew Stanton-Nurse
authored and
Andrew Stanton-Nurse
committed
Merged PR 2918: Cancel Sends if they take too long
1 parent 147f950 commit 0757955

23 files changed

+1013
-107
lines changed

eng/Baseline.Designer.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,7 @@
463463
<BaselinePackageReference Include="Microsoft.AspNetCore.Routing" Version="[2.1.1, )" />
464464
<BaselinePackageReference Include="Microsoft.AspNetCore.WebSockets" Version="[2.1.1, )" />
465465
<BaselinePackageReference Include="Newtonsoft.Json" Version="[11.0.2, )" />
466+
<BaselinePackageReference Include="System.Net.WebSockets.WebSocketProtocol" Version="[4.5.3, )" />
466467
</ItemGroup>
467468
<ItemGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Connections' AND '$(TargetFramework)' == 'netstandard2.0' ">
468469
<BaselinePackageReference Include="Microsoft.AspNetCore.Http.Connections.Common" Version="[1.0.4, )" />
@@ -472,6 +473,7 @@
472473
<BaselinePackageReference Include="Microsoft.AspNetCore.Routing" Version="[2.1.1, )" />
473474
<BaselinePackageReference Include="Microsoft.AspNetCore.WebSockets" Version="[2.1.1, )" />
474475
<BaselinePackageReference Include="Newtonsoft.Json" Version="[11.0.2, )" />
476+
<BaselinePackageReference Include="System.Net.WebSockets.WebSocketProtocol" Version="[4.5.3, )" />
475477
</ItemGroup>
476478
<!-- Package: Microsoft.AspNetCore.Http.Extensions-->
477479
<PropertyGroup Condition=" '$(PackageId)' == 'Microsoft.AspNetCore.Http.Extensions' ">

eng/PatchConfig.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ Later on, this will be checked using this condition:
4646
</PropertyGroup>
4747
<PropertyGroup Condition=" '$(VersionPrefix)' == '2.1.15' ">
4848
<PackagesInPatch>
49+
Microsoft.AspNetCore.Http.Connections;
50+
Microsoft.AspNetCore.SignalR.Core;
4951
</PackagesInPatch>
5052
</PropertyGroup>
5153
</Project>

src/SignalR/clients/ts/FunctionalTests/selenium/run-tests.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,25 @@
11
import { ChildProcess, spawn } from "child_process";
2-
import * as fs from "fs";
2+
import * as _fs from "fs";
33
import { EOL } from "os";
44
import * as path from "path";
5+
import { promisify } from "util";
56
import { PassThrough, Readable } from "stream";
67

78
import { run } from "../../webdriver-tap-runner/lib";
89

910
import * as _debug from "debug";
1011
const debug = _debug("signalr-functional-tests:run");
1112

13+
const ARTIFACTS_DIR = path.resolve(__dirname, "..", "..", "..", "..", "artifacts");
14+
const LOGS_DIR = path.resolve(ARTIFACTS_DIR, "logs");
15+
16+
// Promisify things from fs we want to use.
17+
const fs = {
18+
createWriteStream: _fs.createWriteStream,
19+
exists: promisify(_fs.exists),
20+
mkdir: promisify(_fs.mkdir),
21+
};
22+
1223
process.on("unhandledRejection", (reason) => {
1324
console.error(`Unhandled promise rejection: ${reason}`);
1425
process.exit(1);
@@ -102,6 +113,13 @@ if (chromePath) {
102113
try {
103114
const serverPath = path.resolve(__dirname, "..", "bin", configuration, "netcoreapp2.1", "FunctionalTests.dll");
104115

116+
if (!await fs.exists(ARTIFACTS_DIR)) {
117+
await fs.mkdir(ARTIFACTS_DIR);
118+
}
119+
if (!await fs.exists(LOGS_DIR)) {
120+
await fs.mkdir(LOGS_DIR);
121+
}
122+
105123
debug(`Launching Functional Test Server: ${serverPath}`);
106124
const dotnet = spawn("dotnet", [serverPath], {
107125
env: {
@@ -117,6 +135,9 @@ if (chromePath) {
117135
}
118136
}
119137

138+
const logStream = fs.createWriteStream(path.resolve(LOGS_DIR, "ts.functionaltests.dotnet.log"));
139+
dotnet.stdout.pipe(logStream);
140+
120141
process.on("SIGINT", cleanup);
121142
process.on("exit", cleanup);
122143

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

Lines changed: 79 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public class HttpConnectionContext : ConnectionContext,
2727
IHttpTransportFeature,
2828
IConnectionInherentKeepAliveFeature
2929
{
30+
private static long _tenSeconds = TimeSpan.FromSeconds(10).Ticks;
31+
3032
private readonly object _itemsLock = new object();
3133
private readonly object _heartbeatLock = new object();
3234
private List<(Action<object> handler, object state)> _heartbeatHandlers;
@@ -35,6 +37,13 @@ public class HttpConnectionContext : ConnectionContext,
3537
private IDuplexPipe _application;
3638
private IDictionary<object, object> _items;
3739

40+
private CancellationTokenSource _sendCts;
41+
private bool _activeSend;
42+
private long _startedSendTime;
43+
private readonly object _sendingLock = new object();
44+
45+
internal CancellationToken SendingToken { get; private set; }
46+
3847
// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
3948
// on the same task
4049
private readonly TaskCompletionSource<object> _disposeTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -274,24 +283,45 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
274283
// Cancel any pending flushes from back pressure
275284
Application?.Output.CancelPendingFlush();
276285

277-
// Shutdown both sides and wait for nothing
278-
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
279-
Application?.Output.Complete(transportTask.Exception?.InnerException);
280-
286+
// Normally it isn't safe to try and acquire this lock because the Send can hold onto it for a long time if there is backpressure
287+
// It is safe to wait for this lock now because the Send will be in one of 4 states
288+
// 1. In the middle of a write which is in the middle of being canceled by the CancelPendingFlush above, when it throws
289+
// an OperationCanceledException it will complete the PipeWriter which will make any other Send waiting on the lock
290+
// throw an InvalidOperationException if they call Write
291+
// 2. About to write and see that there is a pending cancel from the CancelPendingFlush, go to 1 to see what happens
292+
// 3. Enters the Send and sees the Dispose state from DisposeAndRemoveAsync and releases the lock
293+
// 4. No Send in progress
294+
await WriteLock.WaitAsync();
281295
try
282296
{
283-
Log.WaitingForTransportAndApplication(_logger, TransportType);
284-
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
285-
await Task.WhenAll(applicationTask, transportTask);
297+
// Complete the applications read loop
298+
Application?.Output.Complete(transportTask.Exception?.InnerException);
286299
}
287300
finally
288301
{
289-
Log.TransportAndApplicationComplete(_logger, TransportType);
290-
291-
// Close the reading side after both sides run
292-
Application?.Input.Complete();
293-
Transport?.Input.Complete();
302+
WriteLock.Release();
294303
}
304+
305+
Application?.Input.CancelPendingRead();
306+
307+
await transportTask.NoThrow();
308+
Application?.Input.Complete();
309+
310+
Log.WaitingForTransportAndApplication(_logger, TransportType);
311+
312+
// A poorly written application *could* in theory get stuck forever and it'll show up as a memory leak
313+
// Wait for application so we can complete the writer safely
314+
await applicationTask.NoThrow();
315+
Log.TransportAndApplicationComplete(_logger, TransportType);
316+
317+
// Shutdown application side now that it's finished
318+
Transport?.Output.Complete(applicationTask.Exception?.InnerException);
319+
320+
// Close the reading side after both sides run
321+
Transport?.Input.Complete();
322+
323+
// Observe exceptions
324+
await Task.WhenAll(transportTask, applicationTask);
295325
}
296326

297327
// Notify all waiters that we're done disposing
@@ -311,6 +341,43 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
311341
}
312342
}
313343

344+
internal void StartSendCancellation()
345+
{
346+
lock (_sendingLock)
347+
{
348+
if (_sendCts == null || _sendCts.IsCancellationRequested)
349+
{
350+
_sendCts = new CancellationTokenSource();
351+
SendingToken = _sendCts.Token;
352+
}
353+
354+
_startedSendTime = DateTime.UtcNow.Ticks;
355+
_activeSend = true;
356+
}
357+
}
358+
359+
internal void TryCancelSend(long currentTicks)
360+
{
361+
lock (_sendingLock)
362+
{
363+
if (_activeSend)
364+
{
365+
if (currentTicks - _startedSendTime > _tenSeconds)
366+
{
367+
_sendCts.Cancel();
368+
}
369+
}
370+
}
371+
}
372+
373+
internal void StopSendCancellation()
374+
{
375+
lock (_sendingLock)
376+
{
377+
_activeSend = false;
378+
}
379+
}
380+
314381
private static class Log
315382
{
316383
private static readonly Action<ILogger, string, Exception> _disposingConnection =

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
144144
connection.SupportedFormats = TransferFormat.Text;
145145

146146
// We only need to provide the Input channel since writing to the application is handled through /send.
147-
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, _loggerFactory);
147+
var sse = new ServerSentEventsTransport(connection.Application.Input, connection.ConnectionId, connection, _loggerFactory);
148148

149149
await DoPersistentConnection(connectionDelegate, sse, context, connection);
150150
}
@@ -264,7 +264,7 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
264264
context.Response.RegisterForDispose(timeoutSource);
265265
context.Response.RegisterForDispose(tokenSource);
266266

267-
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
267+
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory, connection);
268268

269269
// Start the transport
270270
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
@@ -291,7 +291,9 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
291291
connection.Transport.Output.Complete(connection.ApplicationTask.Exception);
292292

293293
// Wait for the transport to run
294-
await connection.TransportTask;
294+
// Ignore exceptions, it has been logged if there is one and the application has finished
295+
// So there is no one to give the exception to
296+
await connection.TransportTask.NoThrow();
295297

296298
// If the status code is a 204 it means the connection is done
297299
if (context.Response.StatusCode == StatusCodes.Status204NoContent)
@@ -307,6 +309,18 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
307309
pollAgain = false;
308310
}
309311
}
312+
else if (connection.TransportTask.IsFaulted || connection.TransportTask.IsCanceled)
313+
{
314+
// Cancel current request to release any waiting poll and let dispose aquire the lock
315+
currentRequestTcs.TrySetCanceled();
316+
317+
// We should be able to safely dispose because there's no more data being written
318+
// We don't need to wait for close here since we've already waited for both sides
319+
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
320+
321+
// Don't poll again if we've removed the connection completely
322+
pollAgain = false;
323+
}
310324
else if (context.Response.StatusCode == StatusCodes.Status204NoContent)
311325
{
312326
// Don't poll if the transport task was canceled
@@ -511,6 +525,14 @@ private async Task ProcessSend(HttpContext context, HttpConnectionDispatcherOpti
511525

512526
context.Response.StatusCode = StatusCodes.Status404NotFound;
513527
context.Response.ContentType = "text/plain";
528+
529+
// There are no writes anymore (since this is the write "loop")
530+
// So it is safe to complete the writer
531+
// We complete the writer here because we already have the WriteLock acquired
532+
// and it's unsafe to complete outside of the lock
533+
// Other code isn't guaranteed to be able to acquire the lock before another write
534+
// even if CancelPendingFlush is called, and the other write could hang if there is backpressure
535+
connection.Application.Output.Complete();
514536
return;
515537
}
516538

@@ -549,11 +571,8 @@ private async Task ProcessDeleteAsync(HttpContext context)
549571

550572
Log.TerminatingConection(_logger);
551573

552-
// Complete the receiving end of the pipe
553-
connection.Application.Output.Complete();
554-
555-
// Dispose the connection gracefully, but don't wait for it. We assign it here so we can wait in tests
556-
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
574+
// Dispose the connection, but don't wait for it. We assign it here so we can wait in tests
575+
connection.DisposeAndRemoveTask = _manager.DisposeAndRemoveAsync(connection, closeGracefully: false);
557576

558577
context.Response.StatusCode = StatusCodes.Status202Accepted;
559578
context.Response.ContentType = "text/plain";

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionManager.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public partial class HttpConnectionManager
3030
private readonly TimerAwaitable _nextHeartbeat;
3131
private readonly ILogger<HttpConnectionManager> _logger;
3232
private readonly ILogger<HttpConnectionContext> _connectionLogger;
33+
private readonly bool _useSendTimeout = true;
3334

3435
public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime appLifetime)
3536
{
@@ -38,6 +39,11 @@ public HttpConnectionManager(ILoggerFactory loggerFactory, IApplicationLifetime
3839
appLifetime.ApplicationStarted.Register(() => Start());
3940
appLifetime.ApplicationStopping.Register(() => CloseConnections());
4041
_nextHeartbeat = new TimerAwaitable(_heartbeatTickRate, _heartbeatTickRate);
42+
43+
if (AppContext.TryGetSwitch("Microsoft.AspNetCore.Http.Connections.DoNotUseSendTimeout", out var timeoutDisabled))
44+
{
45+
_useSendTimeout = !timeoutDisabled;
46+
}
4147
}
4248

4349
public void Start()
@@ -156,9 +162,10 @@ public async Task ScanAsync()
156162
connection.StateLock.Release();
157163
}
158164

165+
var utcNow = DateTimeOffset.UtcNow;
159166
// Once the decision has been made to dispose we don't check the status again
160167
// But don't clean up connections while the debugger is attached.
161-
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
168+
if (!Debugger.IsAttached && status == HttpConnectionStatus.Inactive && (utcNow - lastSeenUtc).TotalSeconds > 5)
162169
{
163170
Log.ConnectionTimedOut(_logger, connection.ConnectionId);
164171
HttpConnectionsEventSource.Log.ConnectionTimedOut(connection.ConnectionId);
@@ -170,6 +177,11 @@ public async Task ScanAsync()
170177
}
171178
else
172179
{
180+
if (!Debugger.IsAttached && _useSendTimeout)
181+
{
182+
connection.TryCancelSend(utcNow.Ticks);
183+
}
184+
173185
// Tick the heartbeat, if the connection is still active
174186
connection.TickHeartbeat();
175187
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System.Runtime.CompilerServices;
5+
6+
namespace System.Threading.Tasks
7+
{
8+
internal static class TaskExtensions
9+
{
10+
public static async Task NoThrow(this Task task)
11+
{
12+
await new NoThrowAwaiter(task);
13+
}
14+
}
15+
16+
internal readonly struct NoThrowAwaiter : ICriticalNotifyCompletion
17+
{
18+
private readonly Task _task;
19+
public NoThrowAwaiter(Task task) { _task = task; }
20+
public NoThrowAwaiter GetAwaiter() => this;
21+
public bool IsCompleted => _task.IsCompleted;
22+
// Observe exception
23+
public void GetResult() { _ = _task.Exception; }
24+
public void OnCompleted(Action continuation) => _task.GetAwaiter().OnCompleted(continuation);
25+
public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
26+
}
27+
}

0 commit comments

Comments
 (0)