Skip to content

Use the default pipe options for backpressure #21001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public async Task CanStartStopSSETransport()
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage {Content = new StreamContent(mockStream.Object)};
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});

try
Expand Down Expand Up @@ -76,15 +76,17 @@ public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
{
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns<Memory<byte>, CancellationToken>(async (data, t) =>
{
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
while (!t.IsCancellationRequested)
if (t.IsCancellationRequested)
{
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
await Task.Delay(100);
return 0;
}

int count = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n", data.Span);
await Task.Delay(100);
return count;
});
mockStream.Setup(s => s.CanRead).Returns(true);

Expand Down Expand Up @@ -120,6 +122,7 @@ await sseTransport.StartAsync(
public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults()
{
var mockHttpHandler = new Mock<HttpMessageHandler>();
var calls = 0;
mockHttpHandler.Protected()
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
Expand All @@ -128,11 +131,15 @@ public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults()

var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns<Memory<byte>, CancellationToken>((data, t) =>
{
var buffer = Encoding.ASCII.GetBytes("data: 3:a");
await stream.WriteAsync(buffer, 0, buffer.Length);
if (calls == 0)
{
calls++;
return new ValueTask<int>(Encoding.ASCII.GetBytes("data: 3:a", data.Span));
}
return new ValueTask<int>(0);
});
mockStream.Setup(s => s.CanRead).Returns(true);

Expand Down Expand Up @@ -165,7 +172,7 @@ bool ExpectedErrors(WriteContext writeContext)
}

var eventStreamTcs = new TaskCompletionSource<object>();
var copyToAsyncTcs = new TaskCompletionSource<int>();
var readTcs = new TaskCompletionSource<int>();

var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
Expand All @@ -182,8 +189,14 @@ bool ExpectedErrors(WriteContext writeContext)
// returns unfinished task to block pipelines
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns<Memory<byte>, CancellationToken>(async (data, ct) =>
{
using (ct.Register(() => readTcs.TrySetCanceled()))
{
return await readTcs.Task;
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
}
Expand Down Expand Up @@ -214,7 +227,7 @@ await sseTransport.StartAsync(
public async Task SSETransportStopsIfChannelClosed()
{
var eventStreamTcs = new TaskCompletionSource<object>();
var copyToAsyncTcs = new TaskCompletionSource<int>();
var readTcs = new TaskCompletionSource<int>();

var mockHttpHandler = new Mock<HttpMessageHandler>();
mockHttpHandler.Protected()
Expand All @@ -229,8 +242,14 @@ public async Task SSETransportStopsIfChannelClosed()
// returns unfinished task to block pipelines
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns(copyToAsyncTcs.Task);
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns<Memory<byte>, CancellationToken>(async (data, ct) =>
{
using (ct.Register(() => readTcs.TrySetCanceled()))
{
return await readTcs.Task;
}
});
mockStream.Setup(s => s.CanRead).Returns(true);
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
});
Expand Down Expand Up @@ -281,7 +300,7 @@ await sseTransport.StartAsync(
public async Task SSETransportCancelsSendOnStop()
{
var eventStreamTcs = new TaskCompletionSource<object>();
var copyToAsyncTcs = new TaskCompletionSource<object>();
var readTcs = new TaskCompletionSource<object>();
var sendSyncPoint = new SyncPoint();

var mockHttpHandler = new Mock<HttpMessageHandler>();
Expand All @@ -299,10 +318,10 @@ public async Task SSETransportCancelsSendOnStop()
// returns unfinished task to block pipelines
var mockStream = new Mock<Stream>();
mockStream
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
.Returns(async () =>
{
await copyToAsyncTcs.Task;
await readTcs.Task;

throw new TaskCanceledException();
});
Expand Down Expand Up @@ -332,7 +351,7 @@ await sseTransport.StartAsync(

var stopTask = sseTransport.StopAsync();

copyToAsyncTcs.SetResult(null);
readTcs.SetResult(null);
sendSyncPoint.Continue();

await stopTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
{
internal static class ClientPipeOptions
{
public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0);
public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,15 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response)
private async Task ProcessEventStream(HttpResponseMessage response, CancellationToken cancellationToken)
{
Log.StartReceive(_logger);

static void CancelReader(object state) => ((PipeReader)state).CancelPendingRead();

using (response)
using (var stream = await response.Content.ReadAsStreamAsync())
{
var options = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0);
var reader = PipeReaderFactory.CreateFromStream(options, stream, cancellationToken);
var reader = PipeReader.Create(stream);

using var registration = cancellationToken.Register(CancelReader, reader);

try
{
Expand Down
31 changes: 23 additions & 8 deletions src/SignalR/server/SignalR/test/EndToEndTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -329,15 +329,30 @@ public async Task ConnectionCanSendAndReceiveDifferentMessageSizesWebSocketsTran
logger.LogInformation("Started connection to {url}", url);

var bytes = Encoding.UTF8.GetBytes(message);
logger.LogInformation("Sending {length} byte message", bytes.Length);
await connection.Transport.Output.WriteAsync(bytes).OrTimeout();
logger.LogInformation("Sent message");

logger.LogInformation("Receiving message");
// Big timeout here because it can take a while to receive all the bytes
var receivedData = await connection.Transport.Input.ReadAsync(bytes.Length).OrTimeout(TimeSpan.FromMinutes(2));
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
logger.LogInformation("Completed receive");
async Task SendMessage()
{
logger.LogInformation("Sending {length} byte message", bytes.Length);
await connection.Transport.Output.WriteAsync(bytes).OrTimeout();
logger.LogInformation("Sent message");
}

async Task ReceiveMessage()
{
logger.LogInformation("Receiving message");
// Big timeout here because it can take a while to receive all the bytes
var receivedData = await connection.Transport.Input.ReadAsync(bytes.Length).OrTimeout(TimeSpan.FromMinutes(2));
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
logger.LogInformation("Completed receive");
}

// Send the receive concurrently so that back pressure is released
// for server -> client sends
var sendingTask = SendMessage();
var receivingTask = ReceiveMessage();

await sendingTask;
await receivingTask;
}
catch (Exception ex)
{
Expand Down