Skip to content

HTTP/3: Fix flakey request headers timeout tests #32131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3
{
internal class Http3Connection : ITimeoutHandler, IHttp3StreamLifetimeHandler
{
// Internal for unit testing
internal readonly Dictionary<long, IHttp3Stream> _streams = new Dictionary<long, IHttp3Stream>();
internal IHttp3StreamLifetimeHandler _streamLifetimeHandler;

private long _highestOpenedStreamId;
private readonly object _sync = new object();
Expand All @@ -49,6 +51,7 @@ public Http3Connection(Http3ConnectionContext context)
_systemClock = context.ServiceContext.SystemClock;
_timeoutControl = new TimeoutControl(this);
_context.TimeoutControl ??= _timeoutControl;
_streamLifetimeHandler = this;

_errorCodeFeature = context.ConnectionFeatures.Get<IProtocolErrorCodeFeature>()!;

Expand Down Expand Up @@ -303,7 +306,7 @@ internal async Task InnerProcessStreamsAsync<TContext>(IHttpApplication<TContext
streamContext.LocalEndPoint as IPEndPoint,
streamContext.RemoteEndPoint as IPEndPoint,
streamContext.Transport,
this,
_streamLifetimeHandler,
streamContext,
_serverSettings);
httpConnectionContext.TimeoutControl = _context.TimeoutControl;
Expand All @@ -314,10 +317,7 @@ internal async Task InnerProcessStreamsAsync<TContext>(IHttpApplication<TContext
{
// Unidirectional stream
var stream = new Http3ControlStream<TContext>(application, httpConnectionContext);
lock (_streams)
{
_streams[streamId] = stream;
}
_streamLifetimeHandler.OnStreamCreated(stream);

ThreadPool.UnsafeQueueUserWorkItem(stream, preferLocal: false);
}
Expand All @@ -327,11 +327,7 @@ internal async Task InnerProcessStreamsAsync<TContext>(IHttpApplication<TContext
UpdateHighestStreamId(streamId);

var stream = new Http3Stream<TContext>(application, httpConnectionContext);
lock (_streams)
{
_activeRequestCount++;
_streams[streamId] = stream;
}
_streamLifetimeHandler.OnStreamCreated(stream);

KestrelEventSource.Log.RequestQueuedStart(stream, AspNetCore.Http.HttpProtocol.Http3);
ThreadPool.UnsafeQueueUserWorkItem(stream, preferLocal: false);
Expand Down Expand Up @@ -470,7 +466,7 @@ private async ValueTask<Http3ControlStream> CreateNewUnidirectionalStreamAsync<T
streamContext.LocalEndPoint as IPEndPoint,
streamContext.RemoteEndPoint as IPEndPoint,
streamContext.Transport,
this,
_streamLifetimeHandler,
streamContext,
_serverSettings);
httpConnectionContext.TimeoutControl = _context.TimeoutControl;
Expand All @@ -490,7 +486,7 @@ private ValueTask<FlushResult> SendGoAway(long id)
return default;
}

public bool OnInboundControlStream(Http3ControlStream stream)
bool IHttp3StreamLifetimeHandler.OnInboundControlStream(Http3ControlStream stream)
{
lock (_sync)
{
Expand All @@ -503,7 +499,7 @@ public bool OnInboundControlStream(Http3ControlStream stream)
}
}

public bool OnInboundEncoderStream(Http3ControlStream stream)
bool IHttp3StreamLifetimeHandler.OnInboundEncoderStream(Http3ControlStream stream)
{
lock (_sync)
{
Expand All @@ -516,7 +512,7 @@ public bool OnInboundEncoderStream(Http3ControlStream stream)
}
}

public bool OnInboundDecoderStream(Http3ControlStream stream)
bool IHttp3StreamLifetimeHandler.OnInboundDecoderStream(Http3ControlStream stream)
{
lock (_sync)
{
Expand All @@ -529,24 +525,42 @@ public bool OnInboundDecoderStream(Http3ControlStream stream)
}
}

public void OnStreamCompleted(IHttp3Stream stream)
void IHttp3StreamLifetimeHandler.OnStreamCreated(IHttp3Stream stream)
{
lock (_streams)
{
_activeRequestCount--;
if (stream.IsRequestStream)
{
_activeRequestCount++;
}
_streams[stream.StreamId] = stream;
}
}

void IHttp3StreamLifetimeHandler.OnStreamCompleted(IHttp3Stream stream)
{
lock (_streams)
{
if (stream.IsRequestStream)
{
_activeRequestCount--;
}
_streams.Remove(stream.StreamId);
}

_streamCompletionAwaitable.Complete();
if (stream.IsRequestStream)
{
_streamCompletionAwaitable.Complete();
}
}

public void OnStreamConnectionError(Http3ConnectionErrorException ex)
void IHttp3StreamLifetimeHandler.OnStreamConnectionError(Http3ConnectionErrorException ex)
{
Log.Http3ConnectionError(ConnectionId, ex);
Abort(new ConnectionAbortedException(ex.Message, ex), ex.ErrorCode);
}

public void OnInboundControlStreamSetting(Http3SettingType type, long value)
void IHttp3StreamLifetimeHandler.OnInboundControlStreamSetting(Http3SettingType type, long value)
{
switch (type)
{
Expand All @@ -561,6 +575,11 @@ public void OnInboundControlStreamSetting(Http3SettingType type, long value)
}
}

void IHttp3StreamLifetimeHandler.OnStreamHeaderReceived(IHttp3Stream stream)
{
Debug.Assert(stream.ReceivedHeader);
}

private static class GracefulCloseInitiator
{
public const int None = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
try
{
_headerType = await TryReadStreamHeaderAsync();
_context.StreamLifetimeHandler.OnStreamHeaderReceived(this);

switch (_headerType)
{
Expand Down Expand Up @@ -195,6 +196,10 @@ public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> appli
_errorCodeFeature.Error = (long)ex.ErrorCode;
_context.StreamLifetimeHandler.OnStreamConnectionError(ex);
}
finally
{
_context.StreamLifetimeHandler.OnStreamCompleted(this);
}
}

private async Task HandleControlStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ private async Task ProcessHeadersFrameAsync<TContext>(IHttpApplication<TContext>
}

_appCompleted = new TaskCompletionSource();
_context.StreamLifetimeHandler.OnStreamHeaderReceived(this);

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3
{
internal interface IHttp3StreamLifetimeHandler
{
void OnStreamCreated(IHttp3Stream stream);
void OnStreamHeaderReceived(IHttp3Stream stream);
void OnStreamCompleted(IHttp3Stream stream);
void OnStreamConnectionError(Http3ConnectionErrorException ex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -47,6 +48,7 @@ public abstract class Http3TestBase : TestApplicationErrorLoggerLoggedTest, IDis
protected Task _connectionTask;
protected readonly TaskCompletionSource _closedStateReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

internal readonly ConcurrentDictionary<long, Http3StreamBase> _runningStreams = new ConcurrentDictionary<long, Http3StreamBase>();

protected readonly RequestDelegate _noopApplication;
protected readonly RequestDelegate _echoApplication;
Expand Down Expand Up @@ -242,10 +244,74 @@ protected void CreateConnection()
httpConnectionContext.TimeoutControl = _mockTimeoutControl.Object;

Connection = new Http3Connection(httpConnectionContext);
Connection._streamLifetimeHandler = new LifetimeHandlerInterceptor(Connection, this);

_mockTimeoutHandler.Setup(h => h.OnTimeout(It.IsAny<TimeoutReason>()))
.Callback<TimeoutReason>(r => Connection.OnTimeout(r));
}

private class LifetimeHandlerInterceptor : IHttp3StreamLifetimeHandler
{
private readonly IHttp3StreamLifetimeHandler _inner;
private readonly Http3TestBase _http3TestBase;

public LifetimeHandlerInterceptor(IHttp3StreamLifetimeHandler inner, Http3TestBase http3TestBase)
{
_inner = inner;
_http3TestBase = http3TestBase;
}

public bool OnInboundControlStream(Internal.Http3.Http3ControlStream stream)
{
return _inner.OnInboundControlStream(stream);
}

public void OnInboundControlStreamSetting(Internal.Http3.Http3SettingType type, long value)
{
_inner.OnInboundControlStreamSetting(type, value);
}

public bool OnInboundDecoderStream(Internal.Http3.Http3ControlStream stream)
{
return _inner.OnInboundDecoderStream(stream);
}

public bool OnInboundEncoderStream(Internal.Http3.Http3ControlStream stream)
{
return _inner.OnInboundEncoderStream(stream);
}

public void OnStreamCompleted(IHttp3Stream stream)
{
_inner.OnStreamCompleted(stream);
}

public void OnStreamConnectionError(Http3ConnectionErrorException ex)
{
_inner.OnStreamConnectionError(ex);
}

public void OnStreamCreated(IHttp3Stream stream)
{
_inner.OnStreamCreated(stream);

if (_http3TestBase._runningStreams.TryGetValue(stream.StreamId, out var testStream))
{
testStream._onStreamCreatedTcs.TrySetResult();
}
}

public void OnStreamHeaderReceived(IHttp3Stream stream)
{
_inner.OnStreamHeaderReceived(stream);

if (_http3TestBase._runningStreams.TryGetValue(stream.StreamId, out var testStream))
{
testStream._onHeaderReceivedTcs.TrySetResult();
}
}
}

protected void ConnectionClosed()
{

Expand Down Expand Up @@ -294,6 +360,8 @@ public ValueTask<Http3ControlStream> CreateControlStream()
public async ValueTask<Http3ControlStream> CreateControlStream(int? id)
{
var stream = new Http3ControlStream(this, StreamInitiator.Client);
_runningStreams[stream.StreamId] = stream;

MultiplexedConnectionContext.ToServerAcceptQueue.Writer.TryWrite(stream.StreamContext);
if (id != null)
{
Expand All @@ -305,6 +373,8 @@ public async ValueTask<Http3ControlStream> CreateControlStream(int? id)
internal ValueTask<Http3RequestStream> CreateRequestStream()
{
var stream = new Http3RequestStream(this, Connection);
_runningStreams[stream.StreamId] = stream;

MultiplexedConnectionContext.ToServerAcceptQueue.Writer.TryWrite(stream.StreamContext);
return new ValueTask<Http3RequestStream>(stream);
}
Expand All @@ -318,12 +388,18 @@ public ValueTask<ConnectionContext> StartBidirectionalStreamAsync()

public class Http3StreamBase : IProtocolErrorCodeFeature
{
internal TaskCompletionSource _onStreamCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
internal TaskCompletionSource _onHeaderReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

internal DuplexPipe.DuplexPipePair _pair;
internal Http3TestBase _testBase;
internal Http3Connection _connection;
private long _bytesReceived;
public long Error { get; set; }

public Task OnStreamCreatedTask => _onStreamCreatedTcs.Task;
public Task OnHeaderReceivedTask => _onHeaderReceivedTcs.Task;

protected Task SendAsync(ReadOnlySpan<byte> span)
{
var writableBuffer = _pair.Application.Output;
Expand Down
Loading