@@ -22,11 +22,8 @@ internal class MessagePump : IServer
22
22
private readonly ILogger _logger ;
23
23
private readonly HttpSysOptions _options ;
24
24
25
- private IHttpApplication < object > _application ;
26
-
27
25
private int _maxAccepts ;
28
26
private int _acceptorCounts ;
29
- private Action < object > _processRequest ;
30
27
31
28
private volatile int _stopping ;
32
29
private int _outstandingRequests ;
@@ -58,15 +55,16 @@ public MessagePump(IOptions<HttpSysOptions> options, ILoggerFactory loggerFactor
58
55
_serverAddresses = new ServerAddressesFeature ( ) ;
59
56
Features . Set < IServerAddressesFeature > ( _serverAddresses ) ;
60
57
61
- _processRequest = new Action < object > ( ProcessRequestAsync ) ;
62
58
_maxAccepts = _options . MaxAccepts ;
63
59
}
64
60
65
61
internal HttpSysListener Listener { get ; }
66
62
63
+ internal IHttpApplication < object > Application { get ; set ; }
64
+
67
65
public IFeatureCollection Features { get ; }
68
66
69
- private bool Stopping => _stopping == 1 ;
67
+ internal bool Stopping => _stopping == 1 ;
70
68
71
69
public Task StartAsync < TContext > ( IHttpApplication < TContext > application , CancellationToken cancellationToken )
72
70
{
@@ -115,11 +113,11 @@ public Task StartAsync<TContext>(IHttpApplication<TContext> application, Cancell
115
113
// else // Attaching to an existing queue, don't add a default.
116
114
117
115
// Can't call Start twice
118
- Contract . Assert ( _application == null ) ;
116
+ Contract . Assert ( Application == null ) ;
119
117
120
118
Contract . Assert ( application != null ) ;
121
119
122
- _application = new ApplicationWrapper < TContext > ( application ) ;
120
+ Application = new ApplicationWrapper < TContext > ( application ) ;
123
121
124
122
Listener . Start ( ) ;
125
123
@@ -151,6 +149,21 @@ private void UpdateUrlPrefixes(IList<string> serverAddressCopy)
151
149
}
152
150
}
153
151
152
+ internal int IncrementOutstandingRequest ( )
153
+ {
154
+ return Interlocked . Increment ( ref _outstandingRequests ) ;
155
+ }
156
+
157
+ internal int DecrementOutstandingRequest ( )
158
+ {
159
+ return Interlocked . Decrement ( ref _outstandingRequests ) ;
160
+ }
161
+
162
+ internal void SetShutdownSignal ( )
163
+ {
164
+ _shutdownSignal . TrySetResult ( null ) ;
165
+ }
166
+
154
167
// The message pump.
155
168
// When we start listening for the next request on one thread, we may need to be sure that the
156
169
// completion continues on another thread as to not block the current request processing.
@@ -165,6 +178,8 @@ private async void ProcessRequestsWorker()
165
178
try
166
179
{
167
180
requestContext = await Listener . AcceptAsync ( ) . SupressContext ( ) ;
181
+ // Assign the message pump to this request context
182
+ requestContext . MessagePump = this ;
168
183
}
169
184
catch ( Exception exception )
170
185
{
@@ -181,7 +196,7 @@ private async void ProcessRequestsWorker()
181
196
}
182
197
try
183
198
{
184
- Task ignored = Task . Factory . StartNew ( _processRequest , requestContext ) ;
199
+ ThreadPool . UnsafeQueueUserWorkItem ( requestContext , preferLocal : false ) ;
185
200
}
186
201
catch ( Exception ex )
187
202
{
@@ -193,79 +208,6 @@ private async void ProcessRequestsWorker()
193
208
Interlocked . Decrement ( ref _acceptorCounts ) ;
194
209
}
195
210
196
- private async void ProcessRequestAsync ( object requestContextObj )
197
- {
198
- var requestContext = requestContextObj as RequestContext ;
199
- try
200
- {
201
- if ( Stopping )
202
- {
203
- SetFatalResponse ( requestContext , 503 ) ;
204
- return ;
205
- }
206
-
207
- object context = null ;
208
- Interlocked . Increment ( ref _outstandingRequests ) ;
209
- try
210
- {
211
- var featureContext = new FeatureContext ( requestContext ) ;
212
- context = _application . CreateContext ( featureContext . Features ) ;
213
- try
214
- {
215
- await _application . ProcessRequestAsync ( context ) . SupressContext ( ) ;
216
- await featureContext . CompleteAsync ( ) ;
217
- }
218
- finally
219
- {
220
- await featureContext . OnCompleted ( ) ;
221
- }
222
- _application . DisposeContext ( context , null ) ;
223
- requestContext . Dispose ( ) ;
224
- }
225
- catch ( Exception ex )
226
- {
227
- _logger . LogError ( LoggerEventIds . RequestProcessError , ex , "ProcessRequestAsync" ) ;
228
- _application . DisposeContext ( context , ex ) ;
229
- if ( requestContext . Response . HasStarted )
230
- {
231
- // HTTP/2 INTERNAL_ERROR = 0x2 https://tools.ietf.org/html/rfc7540#section-7
232
- // Otherwise the default is Cancel = 0x8.
233
- requestContext . SetResetCode ( 2 ) ;
234
- requestContext . Abort ( ) ;
235
- }
236
- else
237
- {
238
- // We haven't sent a response yet, try to send a 500 Internal Server Error
239
- requestContext . Response . Headers . IsReadOnly = false ;
240
- requestContext . Response . Trailers . IsReadOnly = false ;
241
- requestContext . Response . Headers . Clear ( ) ;
242
- requestContext . Response . Trailers . Clear ( ) ;
243
- SetFatalResponse ( requestContext , 500 ) ;
244
- }
245
- }
246
- finally
247
- {
248
- if ( Interlocked . Decrement ( ref _outstandingRequests ) == 0 && Stopping )
249
- {
250
- _logger . LogInformation ( LoggerEventIds . RequestsDrained , "All requests drained." ) ;
251
- _shutdownSignal . TrySetResult ( 0 ) ;
252
- }
253
- }
254
- }
255
- catch ( Exception ex )
256
- {
257
- _logger . LogError ( LoggerEventIds . RequestError , ex , "ProcessRequestAsync" ) ;
258
- requestContext . Abort ( ) ;
259
- }
260
- }
261
-
262
- private static void SetFatalResponse ( RequestContext context , int status )
263
- {
264
- context . Response . StatusCode = status ;
265
- context . Response . ContentLength = 0 ;
266
- context . Dispose ( ) ;
267
- }
268
-
269
211
public Task StopAsync ( CancellationToken cancellationToken )
270
212
{
271
213
void RegisterCancelation ( )
0 commit comments