Skip to content

Commit f9a9788

Browse files
authored
Use the default pipe options for backpressure (#21001)
* Use the default pipe options for backpressure - This controls memory so clients aren't easily overwhelmed. With the changes made to Pipe to no longer throw if the pause threshold is exceeded makes this work well. - Remove PipeReaderFactory - Implement cancellation in SSE Contributes to #17797
1 parent e89e8d1 commit f9a9788

File tree

5 files changed

+70
-81
lines changed

5 files changed

+70
-81
lines changed

src/SignalR/clients/csharp/Client/test/UnitTests/ServerSentEventsTransportTests.cs

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public async Task CanStartStopSSETransport()
4343
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
4444
.Returns(copyToAsyncTcs.Task);
4545
mockStream.Setup(s => s.CanRead).Returns(true);
46-
return new HttpResponseMessage {Content = new StreamContent(mockStream.Object)};
46+
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
4747
});
4848

4949
try
@@ -76,15 +76,17 @@ public async Task SSETransportStopsSendAndReceiveLoopsWhenTransportStopped()
7676
{
7777
var mockStream = new Mock<Stream>();
7878
mockStream
79-
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
80-
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
79+
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
80+
.Returns<Memory<byte>, CancellationToken>(async (data, t) =>
8181
{
82-
var buffer = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n");
83-
while (!t.IsCancellationRequested)
82+
if (t.IsCancellationRequested)
8483
{
85-
await stream.WriteAsync(buffer, 0, buffer.Length).OrTimeout();
86-
await Task.Delay(100);
84+
return 0;
8785
}
86+
87+
int count = Encoding.ASCII.GetBytes("data: 3:abc\r\n\r\n", data.Span);
88+
await Task.Delay(100);
89+
return count;
8890
});
8991
mockStream.Setup(s => s.CanRead).Returns(true);
9092

@@ -120,6 +122,7 @@ await sseTransport.StartAsync(
120122
public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults()
121123
{
122124
var mockHttpHandler = new Mock<HttpMessageHandler>();
125+
var calls = 0;
123126
mockHttpHandler.Protected()
124127
.Setup<Task<HttpResponseMessage>>("SendAsync", ItExpr.IsAny<HttpRequestMessage>(), ItExpr.IsAny<CancellationToken>())
125128
.Returns<HttpRequestMessage, CancellationToken>(async (request, cancellationToken) =>
@@ -128,11 +131,15 @@ public async Task SSETransportStopsWithErrorIfServerSendsIncompleteResults()
128131

129132
var mockStream = new Mock<Stream>();
130133
mockStream
131-
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
132-
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
134+
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
135+
.Returns<Memory<byte>, CancellationToken>((data, t) =>
133136
{
134-
var buffer = Encoding.ASCII.GetBytes("data: 3:a");
135-
await stream.WriteAsync(buffer, 0, buffer.Length);
137+
if (calls == 0)
138+
{
139+
calls++;
140+
return new ValueTask<int>(Encoding.ASCII.GetBytes("data: 3:a", data.Span));
141+
}
142+
return new ValueTask<int>(0);
136143
});
137144
mockStream.Setup(s => s.CanRead).Returns(true);
138145

@@ -165,7 +172,7 @@ bool ExpectedErrors(WriteContext writeContext)
165172
}
166173

167174
var eventStreamTcs = new TaskCompletionSource<object>();
168-
var copyToAsyncTcs = new TaskCompletionSource<int>();
175+
var readTcs = new TaskCompletionSource<int>();
169176

170177
var mockHttpHandler = new Mock<HttpMessageHandler>();
171178
mockHttpHandler.Protected()
@@ -182,8 +189,14 @@ bool ExpectedErrors(WriteContext writeContext)
182189
// returns unfinished task to block pipelines
183190
var mockStream = new Mock<Stream>();
184191
mockStream
185-
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
186-
.Returns(copyToAsyncTcs.Task);
192+
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
193+
.Returns<Memory<byte>, CancellationToken>(async (data, ct) =>
194+
{
195+
using (ct.Register(() => readTcs.TrySetCanceled()))
196+
{
197+
return await readTcs.Task;
198+
}
199+
});
187200
mockStream.Setup(s => s.CanRead).Returns(true);
188201
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
189202
}
@@ -214,7 +227,7 @@ await sseTransport.StartAsync(
214227
public async Task SSETransportStopsIfChannelClosed()
215228
{
216229
var eventStreamTcs = new TaskCompletionSource<object>();
217-
var copyToAsyncTcs = new TaskCompletionSource<int>();
230+
var readTcs = new TaskCompletionSource<int>();
218231

219232
var mockHttpHandler = new Mock<HttpMessageHandler>();
220233
mockHttpHandler.Protected()
@@ -229,8 +242,14 @@ public async Task SSETransportStopsIfChannelClosed()
229242
// returns unfinished task to block pipelines
230243
var mockStream = new Mock<Stream>();
231244
mockStream
232-
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
233-
.Returns(copyToAsyncTcs.Task);
245+
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
246+
.Returns<Memory<byte>, CancellationToken>(async (data, ct) =>
247+
{
248+
using (ct.Register(() => readTcs.TrySetCanceled()))
249+
{
250+
return await readTcs.Task;
251+
}
252+
});
234253
mockStream.Setup(s => s.CanRead).Returns(true);
235254
return new HttpResponseMessage { Content = new StreamContent(mockStream.Object) };
236255
});
@@ -281,7 +300,7 @@ await sseTransport.StartAsync(
281300
public async Task SSETransportCancelsSendOnStop()
282301
{
283302
var eventStreamTcs = new TaskCompletionSource<object>();
284-
var copyToAsyncTcs = new TaskCompletionSource<object>();
303+
var readTcs = new TaskCompletionSource<object>();
285304
var sendSyncPoint = new SyncPoint();
286305

287306
var mockHttpHandler = new Mock<HttpMessageHandler>();
@@ -299,10 +318,10 @@ public async Task SSETransportCancelsSendOnStop()
299318
// returns unfinished task to block pipelines
300319
var mockStream = new Mock<Stream>();
301320
mockStream
302-
.Setup(s => s.CopyToAsync(It.IsAny<Stream>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
303-
.Returns<Stream, int, CancellationToken>(async (stream, bufferSize, t) =>
321+
.Setup(s => s.ReadAsync(It.IsAny<Memory<byte>>(), It.IsAny<CancellationToken>()))
322+
.Returns(async () =>
304323
{
305-
await copyToAsyncTcs.Task;
324+
await readTcs.Task;
306325

307326
throw new TaskCanceledException();
308327
});
@@ -332,7 +351,7 @@ await sseTransport.StartAsync(
332351

333352
var stopTask = sseTransport.StopAsync();
334353

335-
copyToAsyncTcs.SetResult(null);
354+
readTcs.SetResult(null);
336355
sendSyncPoint.Continue();
337356

338357
await stopTask;

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ClientPipeOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ namespace Microsoft.AspNetCore.Http.Connections.Client.Internal
77
{
88
internal static class ClientPipeOptions
99
{
10-
public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false, pauseWriterThreshold: 0, resumeWriterThreshold: 0);
10+
public static PipeOptions DefaultOptions = new PipeOptions(writerScheduler: PipeScheduler.ThreadPool, readerScheduler: PipeScheduler.ThreadPool, useSynchronizationContext: false);
1111
}
1212
}

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/PipeReaderFactory.cs

Lines changed: 0 additions & 48 deletions
This file was deleted.

src/SignalR/clients/csharp/Http.Connections.Client/src/Internal/ServerSentEventsTransport.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,15 @@ private async Task ProcessAsync(Uri url, HttpResponseMessage response)
129129
private async Task ProcessEventStream(HttpResponseMessage response, CancellationToken cancellationToken)
130130
{
131131
Log.StartReceive(_logger);
132+
133+
static void CancelReader(object state) => ((PipeReader)state).CancelPendingRead();
132134

133135
using (response)
134136
using (var stream = await response.Content.ReadAsStreamAsync())
135137
{
136-
var options = new PipeOptions(pauseWriterThreshold: 0, resumeWriterThreshold: 0);
137-
var reader = PipeReaderFactory.CreateFromStream(options, stream, cancellationToken);
138+
var reader = PipeReader.Create(stream);
139+
140+
using var registration = cancellationToken.Register(CancelReader, reader);
138141

139142
try
140143
{

src/SignalR/server/SignalR/test/EndToEndTests.cs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -329,15 +329,30 @@ public async Task ConnectionCanSendAndReceiveDifferentMessageSizesWebSocketsTran
329329
logger.LogInformation("Started connection to {url}", url);
330330

331331
var bytes = Encoding.UTF8.GetBytes(message);
332-
logger.LogInformation("Sending {length} byte message", bytes.Length);
333-
await connection.Transport.Output.WriteAsync(bytes).OrTimeout();
334-
logger.LogInformation("Sent message");
335332

336-
logger.LogInformation("Receiving message");
337-
// Big timeout here because it can take a while to receive all the bytes
338-
var receivedData = await connection.Transport.Input.ReadAsync(bytes.Length).OrTimeout(TimeSpan.FromMinutes(2));
339-
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
340-
logger.LogInformation("Completed receive");
333+
async Task SendMessage()
334+
{
335+
logger.LogInformation("Sending {length} byte message", bytes.Length);
336+
await connection.Transport.Output.WriteAsync(bytes).OrTimeout();
337+
logger.LogInformation("Sent message");
338+
}
339+
340+
async Task ReceiveMessage()
341+
{
342+
logger.LogInformation("Receiving message");
343+
// Big timeout here because it can take a while to receive all the bytes
344+
var receivedData = await connection.Transport.Input.ReadAsync(bytes.Length).OrTimeout(TimeSpan.FromMinutes(2));
345+
Assert.Equal(message, Encoding.UTF8.GetString(receivedData));
346+
logger.LogInformation("Completed receive");
347+
}
348+
349+
// Send the receive concurrently so that back pressure is released
350+
// for server -> client sends
351+
var sendingTask = SendMessage();
352+
var receivingTask = ReceiveMessage();
353+
354+
await sendingTask;
355+
await receivingTask;
341356
}
342357
catch (Exception ex)
343358
{

0 commit comments

Comments
 (0)