Skip to content

Better handle HttpConnectionContext state transitions #8225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public HttpConnectionContext(string id, System.IO.Pipelines.IDuplexPipe transpor
public Microsoft.AspNetCore.Http.HttpContext HttpContext { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public override System.Collections.Generic.IDictionary<object, object> Items { get { throw null; } set { } }
public System.DateTime LastSeenUtc { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public System.DateTime? LastSeenUtcIfInactive { get { throw null; } }
public System.Threading.Tasks.Task PreviousPollTask { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public System.Threading.SemaphoreSlim StateLock { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
public Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus Status { get { throw null; } set { } }
public Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus Status { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public Microsoft.AspNetCore.Connections.TransferFormat SupportedFormats { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public override System.IO.Pipelines.IDuplexPipe Transport { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
public System.Threading.Tasks.Task TransportTask { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } [System.Runtime.CompilerServices.CompilerGeneratedAttribute]set { } }
Expand All @@ -100,9 +100,11 @@ public HttpConnectionContext(string id, System.IO.Pipelines.IDuplexPipe transpor
public System.Threading.SemaphoreSlim WriteLock { [System.Runtime.CompilerServices.CompilerGeneratedAttribute]get { throw null; } }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task DisposeAsync(bool closeGracefully = false) { throw null; }
public void MarkInactive() { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidfowl What's the deal with reference assemblies for pubternal classes? Do we really have to keep back compat? If so, should we just make everything new internal?

public void OnHeartbeat(System.Action<object> action, object state) { }
public void TickHeartbeat() { }
public bool TryChangeState(Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus from, Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus to) { throw null; }
public bool TryActivateLongPollingConnection(Microsoft.AspNetCore.Connections.ConnectionDelegate connectionDelegate, Microsoft.AspNetCore.Http.HttpContext nonClonedContext, System.TimeSpan pollTimeout, System.Threading.Tasks.Task currentRequestTask, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, Microsoft.Extensions.Logging.ILogger dispatcherLogger) { throw null; }
public bool TryActivatePersistentConnection(Microsoft.AspNetCore.Connections.ConnectionDelegate connectionDelegate, Microsoft.AspNetCore.Http.Connections.Internal.Transports.IHttpTransport transport, Microsoft.Extensions.Logging.ILogger dispatcherLogger) { throw null; }
}
public partial class HttpConnectionDispatcher
{
Expand All @@ -122,8 +124,7 @@ public void CloseConnections() { }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task DisposeAndRemoveAsync(Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionContext connection, bool closeGracefully) { throw null; }
public void RemoveConnection(string id) { }
[System.Diagnostics.DebuggerStepThroughAttribute]
public System.Threading.Tasks.Task ScanAsync() { throw null; }
public void Scan() { }
public void Start() { }
public bool TryGetConnection(string id, out Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionContext connection) { throw null; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Security.Claims;
using System.Security.Principal;
Expand All @@ -12,7 +13,9 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Internal.Transports;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Http.Connections.Internal
Expand All @@ -28,14 +31,14 @@ public class HttpConnectionContext : ConnectionContext,
IHttpTransportFeature,
IConnectionInherentKeepAliveFeature
{
private readonly object _stateLock = new object();
private readonly object _itemsLock = new object();
private readonly object _heartbeatLock = new object();
private List<(Action<object> handler, object state)> _heartbeatHandlers;
private readonly ILogger _logger;
private PipeWriterStream _applicationStream;
private IDuplexPipe _application;
private IDictionary<object, object> _items;
private int _status = (int)HttpConnectionStatus.Inactive;

// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
// on the same task
Expand Down Expand Up @@ -83,7 +86,6 @@ public HttpConnectionContext(string id, IDuplexPipe transport, IDuplexPipe appli
public HttpTransportType TransportType { get; set; }

public SemaphoreSlim WriteLock { get; } = new SemaphoreSlim(1, 1);
public SemaphoreSlim StateLock { get; } = new SemaphoreSlim(1, 1);

// Used for testing only
internal Task DisposeAndRemoveTask { get; set; }
Expand All @@ -96,7 +98,18 @@ public HttpConnectionContext(string id, IDuplexPipe transport, IDuplexPipe appli

public DateTime LastSeenUtc { get; set; }

public HttpConnectionStatus Status { get => (HttpConnectionStatus)_status; set => Interlocked.Exchange(ref _status, (int)value); }
public DateTime? LastSeenUtcIfInactive
{
get
{
lock (_stateLock)
{
return Status == HttpConnectionStatus.Inactive ? (DateTime?)LastSeenUtc : null;
}
}
}

public HttpConnectionStatus Status { get; set; } = HttpConnectionStatus.Inactive;

public override string ConnectionId { get; set; }

Expand Down Expand Up @@ -184,29 +197,29 @@ public async Task DisposeAsync(bool closeGracefully = false)
{
Task disposeTask;

await StateLock.WaitAsync();
try
{
if (Status == HttpConnectionStatus.Disposed)
{
disposeTask = _disposeTcs.Task;
}
else
lock (_stateLock)
{
Status = HttpConnectionStatus.Disposed;
if (Status == HttpConnectionStatus.Disposed)
{
disposeTask = _disposeTcs.Task;
}
else
{
Status = HttpConnectionStatus.Disposed;

Log.DisposingConnection(_logger, ConnectionId);
Log.DisposingConnection(_logger, ConnectionId);

var applicationTask = ApplicationTask ?? Task.CompletedTask;
var transportTask = TransportTask ?? Task.CompletedTask;
var applicationTask = ApplicationTask ?? Task.CompletedTask;
var transportTask = TransportTask ?? Task.CompletedTask;

disposeTask = WaitOnTasks(applicationTask, transportTask, closeGracefully);
disposeTask = WaitOnTasks(applicationTask, transportTask, closeGracefully);
}
}
}
finally
{
StateLock.Release();

Cancellation?.Dispose();

Cancellation = null;
Expand Down Expand Up @@ -310,9 +323,145 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
}
}

public bool TryChangeState(HttpConnectionStatus from, HttpConnectionStatus to)
public bool TryActivatePersistentConnection(
ConnectionDelegate connectionDelegate,
IHttpTransport transport,
ILogger dispatcherLogger)
{
lock (_stateLock)
{
if (Status == HttpConnectionStatus.Inactive)
{
Status = HttpConnectionStatus.Active;

// Call into the end point passing the connection
ApplicationTask = ExecuteApplication(connectionDelegate);

// Start the transport
TransportTask = transport.ProcessRequestAsync(HttpContext, HttpContext.RequestAborted);

return true;
}
else
{
FailActivationUnsynchronized(HttpContext, dispatcherLogger);

return false;
}
}
}

public bool TryActivateLongPollingConnection(
ConnectionDelegate connectionDelegate,
HttpContext nonClonedContext,
TimeSpan pollTimeout,
Task currentRequestTask,
ILoggerFactory loggerFactory,
ILogger dispatcherLogger)
{
return Interlocked.CompareExchange(ref _status, (int)to, (int)from) == (int)from;
lock (_stateLock)
{
if (Status == HttpConnectionStatus.Inactive)
{
Status = HttpConnectionStatus.Active;

PreviousPollTask = currentRequestTask;

// Raise OnConnected for new connections only since polls happen all the time
if (ApplicationTask == null)
{
HttpConnectionDispatcher.Log.EstablishedConnection(dispatcherLogger);

ApplicationTask = ExecuteApplication(connectionDelegate);

nonClonedContext.Response.ContentType = "application/octet-stream";

// This request has no content
nonClonedContext.Response.ContentLength = 0;

// On the first poll, we flush the response immediately to mark the poll as "initialized" so future
// requests can be made safely
TransportTask = nonClonedContext.Response.Body.FlushAsync();
}
else
{
HttpConnectionDispatcher.Log.ResumingConnection(dispatcherLogger);

// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
Cancellation = new CancellationTokenSource();

var timeoutSource = new CancellationTokenSource();
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(Cancellation.Token, nonClonedContext.RequestAborted, timeoutSource.Token);

// Dispose these tokens when the request is over
nonClonedContext.Response.RegisterForDispose(timeoutSource);
nonClonedContext.Response.RegisterForDispose(tokenSource);

var longPolling = new LongPollingTransport(timeoutSource.Token, Application.Input, loggerFactory);

// Start the transport
TransportTask = longPolling.ProcessRequestAsync(nonClonedContext, tokenSource.Token);

// Start the timeout after we return from creating the transport task
timeoutSource.CancelAfter(pollTimeout);
}

return true;
}
else
{
FailActivationUnsynchronized(nonClonedContext, dispatcherLogger);

return false;
}
}
}

private void FailActivationUnsynchronized(HttpContext nonClonedContext, ILogger dispatcherLogger)
{
if (Status == HttpConnectionStatus.Active)
{
HttpConnectionDispatcher.Log.ConnectionAlreadyActive(dispatcherLogger, ConnectionId, HttpContext.TraceIdentifier);

// Reject the request with a 409 conflict
nonClonedContext.Response.StatusCode = StatusCodes.Status409Conflict;
nonClonedContext.Response.ContentType = "text/plain";
}
else
{
Debug.Assert(Status == HttpConnectionStatus.Disposed);

HttpConnectionDispatcher.Log.ConnectionDisposed(dispatcherLogger, ConnectionId);

// Connection was disposed
nonClonedContext.Response.StatusCode = StatusCodes.Status404NotFound;
nonClonedContext.Response.ContentType = "text/plain";
}
}

public void MarkInactive()
{
lock (_stateLock)
{
if (Status == HttpConnectionStatus.Active)
{
Status = HttpConnectionStatus.Inactive;
LastSeenUtc = DateTime.UtcNow;
}
}
}

private async Task ExecuteApplication(ConnectionDelegate connectionDelegate)
{
// Verify some initialization invariants
Debug.Assert(TransportType != HttpTransportType.None, "Transport has not been initialized yet");

// Jump onto the thread pool thread so blocking user code doesn't block the setup of the
// connection and transport
await AwaitableThreadPool.Yield();

// Running this in an async method turns sync exceptions into async ones
await connectionDelegate(this);
}

private static class Log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
{
public partial class HttpConnectionDispatcher
{
private static class Log
internal static class Log
{
private static readonly Action<ILogger, string, Exception> _connectionDisposed =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(1, "ConnectionDisposed"), "Connection {TransportConnectionId} was disposed.");
Expand Down
Loading