Skip to content

Commit 3c9dc75

Browse files
committed
Expose response PipeWriter in Kestrel
1 parent 4539b0d commit 3c9dc75

35 files changed

+2236
-438
lines changed

src/Http/Http.Abstractions/src/Extensions/HttpResponseWritingExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static class HttpResponseWritingExtensions
6161
}
6262

6363
byte[] data = encoding.GetBytes(text);
64-
return response.Body.WriteAsync(data, 0, data.Length, cancellationToken);
64+
return response.BodyPipe.WriteAsync(new Memory<byte>(data, 0, data.Length), cancellationToken).AsTask();
6565
}
6666
}
67-
}
67+
}

src/Http/Http.Features/src/FeatureReferences.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,4 @@ private TFeature UpdateCached<TFeature, TState>(ref TFeature cached, TState stat
9595
public TFeature Fetch<TFeature>(ref TFeature cached, Func<IFeatureCollection, TFeature> factory)
9696
where TFeature : class => Fetch(ref cached, Collection, factory);
9797
}
98-
}
98+
}

src/Http/Http/src/StreamPipeWriter.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public override Memory<byte> GetMemory(int sizeHint = 0)
9393
{
9494
EnsureCapacity(sizeHint);
9595

96-
return _currentSegment;
96+
return _currentSegment.Slice(_position);
9797
}
9898

9999
/// <inheritdoc />
@@ -265,7 +265,7 @@ private void AddSegment(int sizeHint = 0)
265265
}
266266

267267
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
268-
_currentSegmentOwner = _pool.Rent(Math.Max(_minimumSegmentSize, sizeHint));
268+
_currentSegmentOwner = _pool.Rent(Math.Min(Math.Max(_minimumSegmentSize, sizeHint), _pool.MaxBufferSize));
269269
_currentSegment = _currentSegmentOwner.Memory;
270270
_position = 0;
271271
}

src/Http/Http/src/WriteOnlyPipeStream.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public override int ReadTimeout
6060
set => throw new NotSupportedException();
6161
}
6262

63+
public PipeWriter InnerPipeWriter => _pipeWriter;
64+
6365
/// <inheritdoc />
6466
public override int Read(byte[] buffer, int offset, int count)
6567
=> throw new NotSupportedException();

src/Servers/Kestrel/Core/src/Internal/Http/ChunkWriter.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,14 @@ public static int BeginChunkBytes(int dataCount, Span<byte> span)
4444
return count;
4545
}
4646

47-
internal static void WriteBeginChunkBytes(this ref BufferWriter<PipeWriter> start, int dataCount)
47+
internal static int WriteBeginChunkBytes(this ref BufferWriter<PipeWriter> start, int dataCount)
4848
{
4949
// 10 bytes is max length + \r\n
5050
start.Ensure(10);
5151

5252
var count = BeginChunkBytes(dataCount, start.Span);
5353
start.Advance(count);
54+
return count;
5455
}
5556

5657
internal static void WriteEndChunkBytes(this ref BufferWriter<PipeWriter> start)

src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs

Lines changed: 151 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Buffers;
6+
using System.Diagnostics;
67
using System.IO.Pipelines;
78
using System.Text;
89
using System.Threading;
@@ -26,15 +27,26 @@ public class Http1OutputProducer : IHttpOutputProducer, IHttpOutputAborter, IDis
2627
private readonly IHttpMinResponseDataRateFeature _minResponseDataRateFeature;
2728
private readonly TimingPipeFlusher _flusher;
2829

29-
// This locks access to to all of the below fields
30+
// This locks access to all of the below fields
3031
private readonly object _contextLock = new object();
3132

3233
private bool _completed = false;
3334
private bool _aborted;
3435
private long _unflushedBytes;
35-
36+
private bool _autoChunk;
3637
private readonly PipeWriter _pipeWriter;
3738

39+
private const int BeginChunkLengthMax = 5;
40+
private const int EndChunkLength = 2;
41+
42+
// Chunked responses need to be treated uniquely when using GetMemory + Advance.
43+
// We need to know the size of the data written to the chunk before calling Advance on the
44+
// PipeWriter, meaning we internally track how far we have advanced through a current chunk (_advancedBytesForChunk).
45+
// Once write or flush is called, we modify the _currentChunkMemory to prepend the size of data written
46+
// and append the end terminator.
47+
private int _advancedBytesForChunk;
48+
private Memory<byte> _currentChunkMemory;
49+
3850
public Http1OutputProducer(
3951
PipeWriter pipeWriter,
4052
string connectionId,
@@ -58,28 +70,92 @@ public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancella
5870
return Task.FromCanceled(cancellationToken);
5971
}
6072

73+
return WriteAsync(buffer, cancellationToken).AsTask();
74+
}
75+
76+
public ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
77+
{
78+
if (cancellationToken.IsCancellationRequested)
79+
{
80+
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
81+
}
82+
6183
return WriteAsync(buffer, cancellationToken);
6284
}
6385

64-
public Task WriteStreamSuffixAsync()
86+
public ValueTask<FlushResult> WriteStreamSuffixAsync()
6587
{
6688
return WriteAsync(_endChunkedResponseBytes.Span);
6789
}
6890

69-
public Task FlushAsync(CancellationToken cancellationToken = default)
91+
public ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
7092
{
7193
return WriteAsync(Constants.EmptyData, cancellationToken);
7294
}
7395

74-
public Task WriteChunkAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
96+
public Memory<byte> GetMemory(int sizeHint = 0)
97+
{
98+
if (_autoChunk)
99+
{
100+
return GetChunkedMemory(sizeHint);
101+
}
102+
else
103+
{
104+
return _pipeWriter.GetMemory(sizeHint);
105+
}
106+
}
107+
108+
public Span<byte> GetSpan(int sizeHint = 0)
109+
{
110+
if (_autoChunk)
111+
{
112+
return GetChunkedMemory(sizeHint).Span;
113+
}
114+
else
115+
{
116+
return _pipeWriter.GetMemory(sizeHint).Span;
117+
}
118+
}
119+
120+
public void Advance(int bytes)
121+
{
122+
if (_autoChunk)
123+
{
124+
if (bytes < 0)
125+
{
126+
throw new ArgumentOutOfRangeException(nameof(bytes));
127+
}
128+
129+
if (bytes + _advancedBytesForChunk > _currentChunkMemory.Length - BeginChunkLengthMax - EndChunkLength)
130+
{
131+
throw new InvalidOperationException("Can't advance past buffer size.");
132+
}
133+
_advancedBytesForChunk += bytes;
134+
}
135+
else
136+
{
137+
_pipeWriter.Advance(bytes);
138+
}
139+
}
140+
141+
public void CancelPendingFlush()
142+
{
143+
// TODO we may not want to support this.
144+
_pipeWriter.CancelPendingFlush();
145+
}
146+
147+
// This method is for chunked http responses
148+
public ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
75149
{
76150
lock (_contextLock)
77151
{
78152
if (_completed)
79153
{
80-
return Task.CompletedTask;
154+
return default;
81155
}
82156

157+
CommitChunkToPipe();
158+
83159
if (buffer.Length > 0)
84160
{
85161
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
@@ -96,7 +172,7 @@ public Task WriteChunkAsync(ReadOnlySpan<byte> buffer, CancellationToken cancell
96172
return FlushAsync(cancellationToken);
97173
}
98174

99-
public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders)
175+
public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpResponseHeaders responseHeaders, bool autoChunk)
100176
{
101177
lock (_contextLock)
102178
{
@@ -117,6 +193,7 @@ public void WriteResponseHeaders(int statusCode, string reasonPhrase, HttpRespon
117193
writer.Commit();
118194

119195
_unflushedBytes += writer.BytesCommitted;
196+
_autoChunk = autoChunk;
120197
}
121198
}
122199

@@ -139,7 +216,6 @@ public void Abort(ConnectionAbortedException error)
139216
{
140217
// Abort can be called after Dispose if there's a flush timeout.
141218
// It's important to still call _lifetimeFeature.Abort() in this case.
142-
143219
lock (_contextLock)
144220
{
145221
if (_aborted)
@@ -153,20 +229,33 @@ public void Abort(ConnectionAbortedException error)
153229
}
154230
}
155231

156-
public Task Write100ContinueAsync()
232+
public ValueTask<FlushResult> Write100ContinueAsync()
157233
{
158234
return WriteAsync(_continueBytes.Span);
159235
}
160236

161-
private Task WriteAsync(
237+
public void Complete(Exception exception = null)
238+
{
239+
// TODO What to do with exception.
240+
// and how to handle writing to response here.
241+
}
242+
243+
private ValueTask<FlushResult> WriteAsync(
162244
ReadOnlySpan<byte> buffer,
163245
CancellationToken cancellationToken = default)
164246
{
165247
lock (_contextLock)
166248
{
167249
if (_completed)
168250
{
169-
return Task.CompletedTask;
251+
return default;
252+
}
253+
254+
if (_autoChunk)
255+
{
256+
// If there is data that was chunked before writing (ex someone did GetMemory->Advance->WriteAsync)
257+
// make sure to write whatever was advanced first
258+
CommitChunkToPipe();
170259
}
171260

172261
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
@@ -188,5 +277,56 @@ private Task WriteAsync(
188277
cancellationToken);
189278
}
190279
}
280+
281+
private Memory<byte> GetChunkedMemory(int sizeHint)
282+
{
283+
// The max size of a chunk will be the size of memory returned from the PipeWriter (today 4096)
284+
// minus 5 for the max chunked prefix size and minus 2 for the chunked ending, leaving a total of
285+
// 4089.
286+
287+
if (_currentChunkMemory.Length == 0)
288+
{
289+
// First time calling GetMemory
290+
_currentChunkMemory = _pipeWriter.GetMemory(sizeHint);
291+
}
292+
293+
var memoryMaxLength = _currentChunkMemory.Length - BeginChunkLengthMax - EndChunkLength;
294+
if (_advancedBytesForChunk == memoryMaxLength)
295+
{
296+
// Chunk is completely written, commit it to the pipe so GetMemory will return a new chunk of memory.
297+
CommitChunkToPipe();
298+
_currentChunkMemory = _pipeWriter.GetMemory(sizeHint);
299+
}
300+
301+
var actualMemory = _currentChunkMemory.Slice(
302+
BeginChunkLengthMax + _advancedBytesForChunk,
303+
memoryMaxLength - _advancedBytesForChunk);
304+
305+
return actualMemory;
306+
}
307+
308+
private void CommitChunkToPipe()
309+
{
310+
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
311+
312+
Debug.Assert(_advancedBytesForChunk <= _currentChunkMemory.Length);
313+
314+
if (_advancedBytesForChunk > 0)
315+
{
316+
var bytesWritten = writer.WriteBeginChunkBytes(_advancedBytesForChunk);
317+
if (bytesWritten < BeginChunkLengthMax)
318+
{
319+
// If the current chunk of memory isn't completely utilized, we need to copy the contents forwards.
320+
// This occurs if someone uses less than 255 bytes of the current Memory segment.
321+
// Therefore, we need to copy it forwards by either 1 or 2 bytes (depending on number of bytes)
322+
_currentChunkMemory.Slice(BeginChunkLengthMax, _advancedBytesForChunk).CopyTo(_currentChunkMemory.Slice(bytesWritten));
323+
}
324+
325+
writer.Write(_currentChunkMemory.Slice(bytesWritten, _advancedBytesForChunk).Span);
326+
writer.WriteEndChunkBytes();
327+
writer.Commit();
328+
_advancedBytesForChunk = 0;
329+
}
330+
}
191331
}
192332
}

0 commit comments

Comments
 (0)