4
4
using System ;
5
5
using System . Collections . Concurrent ;
6
6
using System . Collections . Generic ;
7
+ using System . Diagnostics ;
7
8
using System . IO . Pipelines ;
8
9
using System . Security . Claims ;
9
10
using System . Security . Principal ;
12
13
using Microsoft . AspNetCore . Connections ;
13
14
using Microsoft . AspNetCore . Connections . Features ;
14
15
using Microsoft . AspNetCore . Http . Connections . Features ;
16
+ using Microsoft . AspNetCore . Http . Connections . Internal . Transports ;
15
17
using Microsoft . AspNetCore . Http . Features ;
18
+ using Microsoft . AspNetCore . Internal ;
16
19
using Microsoft . Extensions . Logging ;
17
20
18
21
namespace Microsoft . AspNetCore . Http . Connections . Internal
@@ -28,14 +31,14 @@ public class HttpConnectionContext : ConnectionContext,
28
31
IHttpTransportFeature ,
29
32
IConnectionInherentKeepAliveFeature
30
33
{
34
+ private readonly object _stateLock = new object ( ) ;
31
35
private readonly object _itemsLock = new object ( ) ;
32
36
private readonly object _heartbeatLock = new object ( ) ;
33
37
private List < ( Action < object > handler , object state ) > _heartbeatHandlers ;
34
38
private readonly ILogger _logger ;
35
39
private PipeWriterStream _applicationStream ;
36
40
private IDuplexPipe _application ;
37
41
private IDictionary < object , object > _items ;
38
- private int _status = ( int ) HttpConnectionStatus . Inactive ;
39
42
40
43
// This tcs exists so that multiple calls to DisposeAsync all wait asynchronously
41
44
// on the same task
@@ -83,7 +86,6 @@ public HttpConnectionContext(string id, IDuplexPipe transport, IDuplexPipe appli
83
86
public HttpTransportType TransportType { get ; set ; }
84
87
85
88
public SemaphoreSlim WriteLock { get ; } = new SemaphoreSlim ( 1 , 1 ) ;
86
- public SemaphoreSlim StateLock { get ; } = new SemaphoreSlim ( 1 , 1 ) ;
87
89
88
90
// Used for testing only
89
91
internal Task DisposeAndRemoveTask { get ; set ; }
@@ -96,7 +98,18 @@ public HttpConnectionContext(string id, IDuplexPipe transport, IDuplexPipe appli
96
98
97
99
public DateTime LastSeenUtc { get ; set ; }
98
100
99
- public HttpConnectionStatus Status { get => ( HttpConnectionStatus ) _status ; set => Interlocked . Exchange ( ref _status , ( int ) value ) ; }
101
+ public DateTime ? LastSeenUtcIfInactive
102
+ {
103
+ get
104
+ {
105
+ lock ( _stateLock )
106
+ {
107
+ return Status == HttpConnectionStatus . Inactive ? ( DateTime ? ) LastSeenUtc : null ;
108
+ }
109
+ }
110
+ }
111
+
112
+ public HttpConnectionStatus Status { get ; set ; } = HttpConnectionStatus . Inactive ;
100
113
101
114
public override string ConnectionId { get ; set ; }
102
115
@@ -184,29 +197,29 @@ public async Task DisposeAsync(bool closeGracefully = false)
184
197
{
185
198
Task disposeTask ;
186
199
187
- await StateLock . WaitAsync ( ) ;
188
200
try
189
201
{
190
- if ( Status == HttpConnectionStatus . Disposed )
191
- {
192
- disposeTask = _disposeTcs . Task ;
193
- }
194
- else
202
+ lock ( _stateLock )
195
203
{
196
- Status = HttpConnectionStatus . Disposed ;
204
+ if ( Status == HttpConnectionStatus . Disposed )
205
+ {
206
+ disposeTask = _disposeTcs . Task ;
207
+ }
208
+ else
209
+ {
210
+ Status = HttpConnectionStatus . Disposed ;
197
211
198
- Log . DisposingConnection ( _logger , ConnectionId ) ;
212
+ Log . DisposingConnection ( _logger , ConnectionId ) ;
199
213
200
- var applicationTask = ApplicationTask ?? Task . CompletedTask ;
201
- var transportTask = TransportTask ?? Task . CompletedTask ;
214
+ var applicationTask = ApplicationTask ?? Task . CompletedTask ;
215
+ var transportTask = TransportTask ?? Task . CompletedTask ;
202
216
203
- disposeTask = WaitOnTasks ( applicationTask , transportTask , closeGracefully ) ;
217
+ disposeTask = WaitOnTasks ( applicationTask , transportTask , closeGracefully ) ;
218
+ }
204
219
}
205
220
}
206
221
finally
207
222
{
208
- StateLock . Release ( ) ;
209
-
210
223
Cancellation ? . Dispose ( ) ;
211
224
212
225
Cancellation = null ;
@@ -310,9 +323,145 @@ private async Task WaitOnTasks(Task applicationTask, Task transportTask, bool cl
310
323
}
311
324
}
312
325
313
- public bool TryChangeState ( HttpConnectionStatus from , HttpConnectionStatus to )
326
+ public bool TryActivatePersistentConnection (
327
+ ConnectionDelegate connectionDelegate ,
328
+ IHttpTransport transport ,
329
+ ILogger dispatcherLogger )
330
+ {
331
+ lock ( _stateLock )
332
+ {
333
+ if ( Status == HttpConnectionStatus . Inactive )
334
+ {
335
+ Status = HttpConnectionStatus . Active ;
336
+
337
+ // Call into the end point passing the connection
338
+ ApplicationTask = ExecuteApplication ( connectionDelegate ) ;
339
+
340
+ // Start the transport
341
+ TransportTask = transport . ProcessRequestAsync ( HttpContext , HttpContext . RequestAborted ) ;
342
+
343
+ return true ;
344
+ }
345
+ else
346
+ {
347
+ FailActivationUnsynchronized ( HttpContext , dispatcherLogger ) ;
348
+
349
+ return false ;
350
+ }
351
+ }
352
+ }
353
+
354
+ public bool TryActivateLongPollingConnection (
355
+ ConnectionDelegate connectionDelegate ,
356
+ HttpContext nonClonedContext ,
357
+ TimeSpan pollTimeout ,
358
+ Task currentRequestTask ,
359
+ ILoggerFactory loggerFactory ,
360
+ ILogger dispatcherLogger )
314
361
{
315
- return Interlocked . CompareExchange ( ref _status , ( int ) to , ( int ) from ) == ( int ) from ;
362
+ lock ( _stateLock )
363
+ {
364
+ if ( Status == HttpConnectionStatus . Inactive )
365
+ {
366
+ Status = HttpConnectionStatus . Active ;
367
+
368
+ PreviousPollTask = currentRequestTask ;
369
+
370
+ // Raise OnConnected for new connections only since polls happen all the time
371
+ if ( ApplicationTask == null )
372
+ {
373
+ HttpConnectionDispatcher . Log . EstablishedConnection ( dispatcherLogger ) ;
374
+
375
+ ApplicationTask = ExecuteApplication ( connectionDelegate ) ;
376
+
377
+ nonClonedContext . Response . ContentType = "application/octet-stream" ;
378
+
379
+ // This request has no content
380
+ nonClonedContext . Response . ContentLength = 0 ;
381
+
382
+ // On the first poll, we flush the response immediately to mark the poll as "initialized" so future
383
+ // requests can be made safely
384
+ TransportTask = nonClonedContext . Response . Body . FlushAsync ( ) ;
385
+ }
386
+ else
387
+ {
388
+ HttpConnectionDispatcher . Log . ResumingConnection ( dispatcherLogger ) ;
389
+
390
+ // REVIEW: Performance of this isn't great as this does a bunch of per request allocations
391
+ Cancellation = new CancellationTokenSource ( ) ;
392
+
393
+ var timeoutSource = new CancellationTokenSource ( ) ;
394
+ var tokenSource = CancellationTokenSource . CreateLinkedTokenSource ( Cancellation . Token , nonClonedContext . RequestAborted , timeoutSource . Token ) ;
395
+
396
+ // Dispose these tokens when the request is over
397
+ nonClonedContext . Response . RegisterForDispose ( timeoutSource ) ;
398
+ nonClonedContext . Response . RegisterForDispose ( tokenSource ) ;
399
+
400
+ var longPolling = new LongPollingTransport ( timeoutSource . Token , Application . Input , loggerFactory ) ;
401
+
402
+ // Start the transport
403
+ TransportTask = longPolling . ProcessRequestAsync ( nonClonedContext , tokenSource . Token ) ;
404
+
405
+ // Start the timeout after we return from creating the transport task
406
+ timeoutSource . CancelAfter ( pollTimeout ) ;
407
+ }
408
+
409
+ return true ;
410
+ }
411
+ else
412
+ {
413
+ FailActivationUnsynchronized ( nonClonedContext , dispatcherLogger ) ;
414
+
415
+ return false ;
416
+ }
417
+ }
418
+ }
419
+
420
+ private void FailActivationUnsynchronized ( HttpContext nonClonedContext , ILogger dispatcherLogger )
421
+ {
422
+ if ( Status == HttpConnectionStatus . Active )
423
+ {
424
+ HttpConnectionDispatcher . Log . ConnectionAlreadyActive ( dispatcherLogger , ConnectionId , HttpContext . TraceIdentifier ) ;
425
+
426
+ // Reject the request with a 409 conflict
427
+ nonClonedContext . Response . StatusCode = StatusCodes . Status409Conflict ;
428
+ nonClonedContext . Response . ContentType = "text/plain" ;
429
+ }
430
+ else
431
+ {
432
+ Debug . Assert ( Status == HttpConnectionStatus . Disposed ) ;
433
+
434
+ HttpConnectionDispatcher . Log . ConnectionDisposed ( dispatcherLogger , ConnectionId ) ;
435
+
436
+ // Connection was disposed
437
+ nonClonedContext . Response . StatusCode = StatusCodes . Status404NotFound ;
438
+ nonClonedContext . Response . ContentType = "text/plain" ;
439
+ }
440
+ }
441
+
442
+ public void MarkInactive ( )
443
+ {
444
+ lock ( _stateLock )
445
+ {
446
+ if ( Status == HttpConnectionStatus . Active )
447
+ {
448
+ Status = HttpConnectionStatus . Inactive ;
449
+ LastSeenUtc = DateTime . UtcNow ;
450
+ }
451
+ }
452
+ }
453
+
454
+ private async Task ExecuteApplication ( ConnectionDelegate connectionDelegate )
455
+ {
456
+ // Verify some initialization invariants
457
+ Debug . Assert ( TransportType != HttpTransportType . None , "Transport has not been initialized yet" ) ;
458
+
459
+ // Jump onto the thread pool thread so blocking user code doesn't block the setup of the
460
+ // connection and transport
461
+ await AwaitableThreadPool . Yield ( ) ;
462
+
463
+ // Running this in an async method turns sync exceptions into async ones
464
+ await connectionDelegate ( this ) ;
316
465
}
317
466
318
467
private static class Log
0 commit comments