Skip to content

Commit a61d535

Browse files
authored
Reduce the per connection overhead in SocketConnection (#31308)
* Reduce the per connection overhead in SocketConnection - Removed 3 state machines (StartAsync, ProcessSends and ProcessReceives) - Use ValueTask to remove delegate allocation on both senders and receivers - Remove field from DoSend and DoReceive state machine
1 parent 020745c commit a61d535

File tree

4 files changed

+172
-153
lines changed

4 files changed

+172
-153
lines changed

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketAwaitableEventArgs.cs

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,49 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5-
using System.Diagnostics;
65
using System.IO.Pipelines;
76
using System.Net.Sockets;
8-
using System.Runtime.CompilerServices;
97
using System.Threading;
10-
using System.Threading.Tasks;
8+
using System.Threading.Tasks.Sources;
119

1210
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.Internal
1311
{
14-
internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion
12+
// A slimmed down version of https://github.com/dotnet/runtime/blob/82ca681cbac89d813a3ce397e0c665e6c051ed67/src/libraries/System.Net.Sockets/src/System/Net/Sockets/Socket.Tasks.cs#L798 that
13+
// 1. Doesn't support any custom scheduling other than the PipeScheduler (no sync context, no task scheduler)
14+
// 2. Doesn't do ValueTask validation using the token
15+
// 3. Doesn't support usage outside of async/await (doesn't try to capture and restore the execution context)
16+
// 4. Doesn't use cancellation tokens
17+
internal class SocketAwaitableEventArgs : SocketAsyncEventArgs, IValueTaskSource<int>
1518
{
16-
private static readonly Action _callbackCompleted = () => { };
19+
private static readonly Action<object?> _continuationCompleted = _ => { };
1720

1821
private readonly PipeScheduler _ioScheduler;
1922

20-
private Action? _callback;
23+
private Action<object?>? _continuation;
2124

2225
public SocketAwaitableEventArgs(PipeScheduler ioScheduler)
2326
: base(unsafeSuppressExecutionContextFlow: true)
2427
{
2528
_ioScheduler = ioScheduler;
2629
}
2730

28-
public SocketAwaitableEventArgs GetAwaiter() => this;
29-
public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
30-
31-
public int GetResult()
31+
protected override void OnCompleted(SocketAsyncEventArgs _)
3232
{
33-
Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
33+
var c = _continuation;
34+
35+
if (c != null || (c = Interlocked.CompareExchange(ref _continuation, _continuationCompleted, null)) != null)
36+
{
37+
var continuationState = UserToken;
38+
UserToken = null;
39+
_continuation = _continuationCompleted; // in case someone's polling IsCompleted
3440

35-
_callback = null;
41+
_ioScheduler.Schedule(c, continuationState);
42+
}
43+
}
44+
45+
public int GetResult(short token)
46+
{
47+
_continuation = null;
3648

3749
if (SocketError != SocketError.Success)
3850
{
@@ -43,36 +55,30 @@ public int GetResult()
4355

4456
static void ThrowSocketException(SocketError e)
4557
{
46-
throw new SocketException((int)e);
58+
throw CreateException(e);
4759
}
4860
}
4961

50-
public void OnCompleted(Action continuation)
62+
protected static SocketException CreateException(SocketError e)
5163
{
52-
if (ReferenceEquals(_callback, _callbackCompleted) ||
53-
ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
54-
{
55-
Task.Run(continuation);
56-
}
64+
return new SocketException((int)e);
5765
}
5866

59-
public void UnsafeOnCompleted(Action continuation)
67+
public ValueTaskSourceStatus GetStatus(short token)
6068
{
61-
OnCompleted(continuation);
69+
return !ReferenceEquals(_continuation, _continuationCompleted) ? ValueTaskSourceStatus.Pending :
70+
SocketError == SocketError.Success ? ValueTaskSourceStatus.Succeeded :
71+
ValueTaskSourceStatus.Faulted;
6272
}
6373

64-
public void Complete()
74+
public void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
6575
{
66-
OnCompleted(this);
67-
}
68-
69-
protected override void OnCompleted(SocketAsyncEventArgs _)
70-
{
71-
var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
72-
73-
if (continuation != null)
76+
UserToken = state;
77+
var prevContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
78+
if (ReferenceEquals(prevContinuation, _continuationCompleted))
7479
{
75-
_ioScheduler.Schedule(state => ((Action)state!)(), continuation);
80+
UserToken = null;
81+
ThreadPool.UnsafeQueueUserWorkItem(continuation, state, preferLocal: true);
7682
}
7783
}
7884
}

src/Servers/Kestrel/Transport.Sockets/src/Internal/SocketConnection.cs

Lines changed: 97 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ internal sealed class SocketConnection : TransportConnection
2828
private readonly object _shutdownLock = new object();
2929
private volatile bool _socketDisposed;
3030
private volatile Exception? _shutdownReason;
31-
private Task? _processingTask;
31+
private Task? _sendingTask;
32+
private Task? _receivingTask;
3233
private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource();
3334
private bool _connectionClosed;
3435
private readonly bool _waitForData;
@@ -78,28 +79,16 @@ internal SocketConnection(Socket socket,
7879
public override MemoryPool<byte> MemoryPool { get; }
7980

8081
public void Start()
81-
{
82-
_processingTask = StartAsync();
83-
}
84-
85-
private async Task StartAsync()
8682
{
8783
try
8884
{
8985
// Spawn send and receive logic
90-
var receiveTask = DoReceive();
91-
var sendTask = DoSend();
92-
93-
// Now wait for both to complete
94-
await receiveTask;
95-
await sendTask;
96-
97-
_receiver.Dispose();
98-
_sender?.Dispose();
86+
_receivingTask = DoReceive();
87+
_sendingTask = DoSend();
9988
}
10089
catch (Exception ex)
10190
{
102-
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}.");
91+
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
10392
}
10493
}
10594

@@ -118,9 +107,28 @@ public override async ValueTask DisposeAsync()
118107
_originalTransport.Input.Complete();
119108
_originalTransport.Output.Complete();
120109

121-
if (_processingTask != null)
110+
try
111+
{
112+
// Now wait for both to complete
113+
if (_receivingTask != null)
114+
{
115+
await _receivingTask;
116+
}
117+
118+
if (_sendingTask != null)
119+
{
120+
await _sendingTask;
121+
}
122+
123+
}
124+
catch (Exception ex)
125+
{
126+
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(Start)}.");
127+
}
128+
finally
122129
{
123-
await _processingTask;
130+
_receiver.Dispose();
131+
_sender?.Dispose();
124132
}
125133

126134
_connectionClosedTokenSource.Dispose();
@@ -132,7 +140,50 @@ private async Task DoReceive()
132140

133141
try
134142
{
135-
await ProcessReceives();
143+
while (true)
144+
{
145+
if (_waitForData)
146+
{
147+
// Wait for data before allocating a buffer.
148+
await _receiver.WaitForDataAsync(_socket);
149+
}
150+
151+
// Ensure we have some reasonable amount of buffer space
152+
var buffer = Input.GetMemory(MinAllocBufferSize);
153+
154+
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer);
155+
156+
if (bytesReceived == 0)
157+
{
158+
// FIN
159+
_trace.ConnectionReadFin(ConnectionId);
160+
break;
161+
}
162+
163+
Input.Advance(bytesReceived);
164+
165+
var flushTask = Input.FlushAsync();
166+
167+
var paused = !flushTask.IsCompleted;
168+
169+
if (paused)
170+
{
171+
_trace.ConnectionPause(ConnectionId);
172+
}
173+
174+
var result = await flushTask;
175+
176+
if (paused)
177+
{
178+
_trace.ConnectionResume(ConnectionId);
179+
}
180+
181+
if (result.IsCompleted || result.IsCanceled)
182+
{
183+
// Pipe consumer is shut down, do we stop writing
184+
break;
185+
}
186+
}
136187
}
137188
catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode))
138189
{
@@ -176,64 +227,40 @@ private async Task DoReceive()
176227
}
177228
}
178229

179-
private async Task ProcessReceives()
180-
{
181-
// Resolve `input` PipeWriter via the IDuplexPipe interface prior to loop start for performance.
182-
var input = Input;
183-
while (true)
184-
{
185-
if (_waitForData)
186-
{
187-
// Wait for data before allocating a buffer.
188-
await _receiver.WaitForDataAsync(_socket);
189-
}
190-
191-
// Ensure we have some reasonable amount of buffer space
192-
var buffer = input.GetMemory(MinAllocBufferSize);
193-
194-
var bytesReceived = await _receiver.ReceiveAsync(_socket, buffer);
195-
196-
if (bytesReceived == 0)
197-
{
198-
// FIN
199-
_trace.ConnectionReadFin(ConnectionId);
200-
break;
201-
}
202-
203-
input.Advance(bytesReceived);
204-
205-
var flushTask = input.FlushAsync();
206-
207-
var paused = !flushTask.IsCompleted;
208-
209-
if (paused)
210-
{
211-
_trace.ConnectionPause(ConnectionId);
212-
}
213-
214-
var result = await flushTask;
215-
216-
if (paused)
217-
{
218-
_trace.ConnectionResume(ConnectionId);
219-
}
220-
221-
if (result.IsCompleted || result.IsCanceled)
222-
{
223-
// Pipe consumer is shut down, do we stop writing
224-
break;
225-
}
226-
}
227-
}
228-
229230
private async Task DoSend()
230231
{
231232
Exception? shutdownReason = null;
232233
Exception? unexpectedError = null;
233234

234235
try
235236
{
236-
await ProcessSends();
237+
while (true)
238+
{
239+
var result = await Output.ReadAsync();
240+
241+
if (result.IsCanceled)
242+
{
243+
break;
244+
}
245+
var buffer = result.Buffer;
246+
247+
if (!buffer.IsEmpty)
248+
{
249+
_sender = _socketSenderPool.Rent();
250+
await _sender.SendAsync(_socket, buffer);
251+
// We don't return to the pool if there was an exception, and
252+
// we keep the _sender assigned so that we can dispose it in StartAsync.
253+
_socketSenderPool.Return(_sender);
254+
_sender = null;
255+
}
256+
257+
Output.AdvanceTo(buffer.End);
258+
259+
if (result.IsCompleted)
260+
{
261+
break;
262+
}
263+
}
237264
}
238265
catch (SocketException ex) when (IsConnectionResetError(ex.SocketErrorCode))
239266
{
@@ -265,42 +292,6 @@ private async Task DoSend()
265292
}
266293
}
267294

268-
private async Task ProcessSends()
269-
{
270-
// Resolve `output` PipeReader via the IDuplexPipe interface prior to loop start for performance.
271-
var output = Output;
272-
while (true)
273-
{
274-
var result = await output.ReadAsync();
275-
276-
if (result.IsCanceled)
277-
{
278-
break;
279-
}
280-
281-
var buffer = result.Buffer;
282-
283-
var end = buffer.End;
284-
var isCompleted = result.IsCompleted;
285-
if (!buffer.IsEmpty)
286-
{
287-
_sender = _socketSenderPool.Rent();
288-
await _sender.SendAsync(_socket, buffer);
289-
// We don't return to the pool if there was an exception, and
290-
// we keep the _sender assigned so that we can dispose it in StartAsync.
291-
_socketSenderPool.Return(_sender);
292-
_sender = null;
293-
}
294-
295-
output.AdvanceTo(end);
296-
297-
if (isCompleted)
298-
{
299-
break;
300-
}
301-
}
302-
}
303-
304295
private void FireConnectionClosed()
305296
{
306297
// Guard against scheduling this multiple times

0 commit comments

Comments
 (0)