Skip to content

Commit d168ea7

Browse files
committed
Fully transition to active status with lock
1 parent 93b6a9c commit d168ea7

File tree

4 files changed

+133
-109
lines changed

4 files changed

+133
-109
lines changed

src/SignalR/common/Http.Connections/ref/Microsoft.AspNetCore.Http.Connections.netcoreapp3.0.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ public HttpConnectionContext(string id, System.IO.Pipelines.IDuplexPipe transpor
103103
public void MarkInactive() { }
104104
public void OnHeartbeat(System.Action<object> action, object state) { }
105105
public void TickHeartbeat() { }
106-
public Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus TryChangeState(Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus from, Microsoft.AspNetCore.Http.Connections.Internal.HttpConnectionStatus to) { throw null; }
106+
public bool TryActivateLongPollingConnection(Microsoft.AspNetCore.Connections.ConnectionDelegate connectionDelegate, Microsoft.AspNetCore.Http.HttpContext nonClonedContext, System.TimeSpan pollTimeout, System.Threading.Tasks.Task currentRequestTask, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, Microsoft.Extensions.Logging.ILogger dispatcherLogger) { throw null; }
107+
public bool TryActivatePersistentConnection(Microsoft.AspNetCore.Connections.ConnectionDelegate connectionDelegate, Microsoft.AspNetCore.Http.Connections.Internal.Transports.IHttpTransport transport, Microsoft.Extensions.Logging.ILogger dispatcherLogger) { throw null; }
107108
}
108109
public partial class HttpConnectionDispatcher
109110
{

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionContext.cs

Lines changed: 123 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Concurrent;
66
using System.Collections.Generic;
7+
using System.Diagnostics;
78
using System.IO.Pipelines;
89
using System.Security.Claims;
910
using System.Security.Principal;
@@ -12,7 +13,9 @@
1213
using Microsoft.AspNetCore.Connections;
1314
using Microsoft.AspNetCore.Connections.Features;
1415
using Microsoft.AspNetCore.Http.Connections.Features;
16+
using Microsoft.AspNetCore.Http.Connections.Internal.Transports;
1517
using Microsoft.AspNetCore.Http.Features;
18+
using Microsoft.AspNetCore.Internal;
1619
using Microsoft.Extensions.Logging;
1720

1821
namespace Microsoft.AspNetCore.Http.Connections.Internal
@@ -320,17 +323,119 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
320323
}
321324
}
322325

323-
public HttpConnectionStatus TryChangeState(HttpConnectionStatus from, HttpConnectionStatus to)
326+
public bool TryActivatePersistentConnection(
327+
ConnectionDelegate connectionDelegate,
328+
IHttpTransport transport,
329+
ILogger dispatcherLogger)
324330
{
325331
lock (_stateLock)
326332
{
327-
if (Status == from)
333+
if (Status == HttpConnectionStatus.Inactive)
328334
{
329-
Status = to;
330-
return from;
335+
Status = HttpConnectionStatus.Active;
336+
337+
// Call into the end point passing the connection
338+
ApplicationTask = ExecuteApplication(connectionDelegate);
339+
340+
// Start the transport
341+
TransportTask = transport.ProcessRequestAsync(HttpContext, HttpContext.RequestAborted);
342+
343+
return true;
331344
}
345+
else
346+
{
347+
FailActivationUnsynchronized(HttpContext, dispatcherLogger);
332348

333-
return Status;
349+
return false;
350+
}
351+
}
352+
}
353+
354+
public bool TryActivateLongPollingConnection(
355+
ConnectionDelegate connectionDelegate,
356+
HttpContext nonClonedContext,
357+
TimeSpan pollTimeout,
358+
Task currentRequestTask,
359+
ILoggerFactory loggerFactory,
360+
ILogger dispatcherLogger)
361+
{
362+
lock (_stateLock)
363+
{
364+
if (Status == HttpConnectionStatus.Inactive)
365+
{
366+
Status = HttpConnectionStatus.Active;
367+
368+
PreviousPollTask = currentRequestTask;
369+
370+
// Raise OnConnected for new connections only since polls happen all the time
371+
if (ApplicationTask == null)
372+
{
373+
HttpConnectionDispatcher.Log.EstablishedConnection(dispatcherLogger);
374+
375+
ApplicationTask = ExecuteApplication(connectionDelegate);
376+
377+
nonClonedContext.Response.ContentType = "application/octet-stream";
378+
379+
// This request has no content
380+
nonClonedContext.Response.ContentLength = 0;
381+
382+
// On the first poll, we flush the response immediately to mark the poll as "initialized" so future
383+
// requests can be made safely
384+
TransportTask = nonClonedContext.Response.Body.FlushAsync();
385+
}
386+
else
387+
{
388+
HttpConnectionDispatcher.Log.ResumingConnection(dispatcherLogger);
389+
390+
// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
391+
Cancellation = new CancellationTokenSource();
392+
393+
var timeoutSource = new CancellationTokenSource();
394+
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(Cancellation.Token, nonClonedContext.RequestAborted, timeoutSource.Token);
395+
396+
// Dispose these tokens when the request is over
397+
nonClonedContext.Response.RegisterForDispose(timeoutSource);
398+
nonClonedContext.Response.RegisterForDispose(tokenSource);
399+
400+
var longPolling = new LongPollingTransport(timeoutSource.Token, Application.Input, loggerFactory);
401+
402+
// Start the transport
403+
TransportTask = longPolling.ProcessRequestAsync(nonClonedContext, tokenSource.Token);
404+
405+
// Start the timeout after we return from creating the transport task
406+
timeoutSource.CancelAfter(pollTimeout);
407+
}
408+
409+
return true;
410+
}
411+
else
412+
{
413+
FailActivationUnsynchronized(nonClonedContext, dispatcherLogger);
414+
415+
return false;
416+
}
417+
}
418+
}
419+
420+
private void FailActivationUnsynchronized(HttpContext nonClonedContext, ILogger dispatcherLogger)
421+
{
422+
if (Status == HttpConnectionStatus.Active)
423+
{
424+
HttpConnectionDispatcher.Log.ConnectionAlreadyActive(dispatcherLogger, ConnectionId, HttpContext.TraceIdentifier);
425+
426+
// Reject the request with a 409 conflict
427+
nonClonedContext.Response.StatusCode = StatusCodes.Status409Conflict;
428+
nonClonedContext.Response.ContentType = "text/plain";
429+
}
430+
else
431+
{
432+
Debug.Assert(Status == HttpConnectionStatus.Disposed);
433+
434+
HttpConnectionDispatcher.Log.ConnectionDisposed(dispatcherLogger, ConnectionId);
435+
436+
// Connection was disposed
437+
nonClonedContext.Response.StatusCode = StatusCodes.Status404NotFound;
438+
nonClonedContext.Response.ContentType = "text/plain";
334439
}
335440
}
336441

@@ -347,6 +452,19 @@ public void MarkInactive()
347452
}
348453
}
349454

455+
private async Task ExecuteApplication(ConnectionDelegate connectionDelegate)
456+
{
457+
// Verify some initialization invariants
458+
Debug.Assert(TransportType != HttpTransportType.None, "Transport has not been initialized yet");
459+
460+
// Jump onto the thread pool thread so blocking user code doesn't block the setup of the
461+
// connection and transport
462+
await AwaitableThreadPool.Yield();
463+
464+
// Running this in an async method turns sync exceptions into async ones
465+
await connectionDelegate(this);
466+
}
467+
350468
private static class Log
351469
{
352470
private static readonly Action<ILogger, string, Exception> _disposingConnection =

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.Log.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Microsoft.AspNetCore.Http.Connections.Internal
88
{
99
public partial class HttpConnectionDispatcher
1010
{
11-
private static class Log
11+
internal static class Log
1212
{
1313
private static readonly Action<ILogger, string, Exception> _connectionDisposed =
1414
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(1, "ConnectionDisposed"), "Connection {TransportConnectionId} was disposed.");

src/SignalR/common/Http.Connections/src/Internal/HttpConnectionDispatcher.cs

Lines changed: 7 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System;
55
using System.Buffers;
66
using System.Collections.Generic;
7-
using System.Diagnostics;
87
using System.IO;
98
using System.IO.Pipelines;
109
using System.Security.Claims;
@@ -213,69 +212,13 @@ private async Task ExecuteAsync(HttpContext context, ConnectionDelegate connecti
213212
}
214213
}
215214

216-
// Mark the connection as active
217-
var oldState = connection.TryChangeState(HttpConnectionStatus.Inactive, HttpConnectionStatus.Active);
218-
219-
if (oldState == HttpConnectionStatus.Disposed)
220-
{
221-
Log.ConnectionDisposed(_logger, connection.ConnectionId);
222-
223-
// The connection was disposed
224-
context.Response.StatusCode = StatusCodes.Status404NotFound;
225-
context.Response.ContentType = "text/plain";
226-
return;
227-
}
228-
229-
if (oldState == HttpConnectionStatus.Active)
215+
if (!connection.TryActivateLongPollingConnection(
216+
connectionDelegate, context, options.LongPolling.PollTimeout,
217+
currentRequestTcs.Task, _loggerFactory, _logger))
230218
{
231-
var existing = connection.GetHttpContext();
232-
Log.ConnectionAlreadyActive(_logger, connection.ConnectionId, existing.TraceIdentifier);
233-
context.Response.StatusCode = StatusCodes.Status409Conflict;
234-
context.Response.ContentType = "text/plain";
235219
return;
236220
}
237221

238-
connection.PreviousPollTask = currentRequestTcs.Task;
239-
240-
// Raise OnConnected for new connections only since polls happen all the time
241-
if (connection.ApplicationTask == null)
242-
{
243-
Log.EstablishedConnection(_logger);
244-
245-
connection.ApplicationTask = ExecuteApplication(connectionDelegate, connection);
246-
247-
context.Response.ContentType = "application/octet-stream";
248-
249-
// This request has no content
250-
context.Response.ContentLength = 0;
251-
252-
// On the first poll, we flush the response immediately to mark the poll as "initialized" so future
253-
// requests can be made safely
254-
connection.TransportTask = context.Response.Body.FlushAsync();
255-
}
256-
else
257-
{
258-
Log.ResumingConnection(_logger);
259-
260-
// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
261-
connection.Cancellation = new CancellationTokenSource();
262-
263-
var timeoutSource = new CancellationTokenSource();
264-
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(connection.Cancellation.Token, context.RequestAborted, timeoutSource.Token);
265-
266-
// Dispose these tokens when the request is over
267-
context.Response.RegisterForDispose(timeoutSource);
268-
context.Response.RegisterForDispose(tokenSource);
269-
270-
var longPolling = new LongPollingTransport(timeoutSource.Token, connection.Application.Input, _loggerFactory);
271-
272-
// Start the transport
273-
connection.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
274-
275-
// Start the timeout after we return from creating the transport task
276-
timeoutSource.CancelAfter(options.LongPolling.PollTimeout);
277-
}
278-
279222
var resultTask = await Task.WhenAny(connection.ApplicationTask, connection.TransportTask);
280223

281224
try
@@ -340,51 +283,13 @@ private async Task DoPersistentConnection(ConnectionDelegate connectionDelegate,
340283
HttpContext context,
341284
HttpConnectionContext connection)
342285
{
343-
// Mark the connection as active
344-
var oldState = connection.TryChangeState(HttpConnectionStatus.Inactive, HttpConnectionStatus.Active);
345-
346-
if (oldState == HttpConnectionStatus.Disposed)
347-
{
348-
Log.ConnectionDisposed(_logger, connection.ConnectionId);
349-
350-
// Connection was disposed
351-
context.Response.StatusCode = StatusCodes.Status404NotFound;
352-
return;
353-
}
354-
355-
// There's already an active request
356-
if (oldState == HttpConnectionStatus.Active)
286+
if (connection.TryActivatePersistentConnection(connectionDelegate, transport, _logger))
357287
{
358-
Log.ConnectionAlreadyActive(_logger, connection.ConnectionId, connection.GetHttpContext().TraceIdentifier);
288+
// Wait for any of them to end
289+
await Task.WhenAny(connection.ApplicationTask, connection.TransportTask);
359290

360-
// Reject the request with a 409 conflict
361-
context.Response.StatusCode = StatusCodes.Status409Conflict;
362-
return;
291+
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
363292
}
364-
365-
// Call into the end point passing the connection
366-
connection.ApplicationTask = ExecuteApplication(connectionDelegate, connection);
367-
368-
// Start the transport
369-
connection.TransportTask = transport.ProcessRequestAsync(context, context.RequestAborted);
370-
371-
// Wait for any of them to end
372-
await Task.WhenAny(connection.ApplicationTask, connection.TransportTask);
373-
374-
await _manager.DisposeAndRemoveAsync(connection, closeGracefully: true);
375-
}
376-
377-
private async Task ExecuteApplication(ConnectionDelegate connectionDelegate, HttpConnectionContext connection)
378-
{
379-
// Verify some initialization invariants
380-
Debug.Assert(connection.TransportType != HttpTransportType.None, "Transport has not been initialized yet");
381-
382-
// Jump onto the thread pool thread so blocking user code doesn't block the setup of the
383-
// connection and transport
384-
await AwaitableThreadPool.Yield();
385-
386-
// Running this in an async method turns sync exceptions into async ones
387-
await connectionDelegate(connection);
388293
}
389294

390295
private async Task ProcessNegotiate(HttpContext context, HttpConnectionDispatcherOptions options, ConnectionLogScope logScope)

0 commit comments

Comments
 (0)