-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Client to Server Streaming with IAsyncEnumerable #9310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
aa992a6
4ef13e3
185792b
cb6c4f9
723534d
7935b43
ae11096
1a7f668
2beba22
a8f2fba
ee9780f
4557b79
3e4856d
2093723
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,7 +42,9 @@ public partial class HubConnection | |
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(1, 1); | ||
|
||
private static readonly MethodInfo _sendStreamItemsMethod = typeof(HubConnection).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance).Single(m => m.Name.Equals("SendStreamItems")); | ||
|
||
#if NETCOREAPP3_0 | ||
private static readonly MethodInfo _sendIAsyncStreamItemsMethod = typeof(HubConnection).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance).Single(m => m.Name.Equals("SendIAsyncEnumerableStreamItems")); | ||
#endif | ||
// Persistent across all connections | ||
private readonly ILoggerFactory _loggerFactory; | ||
private readonly ILogger _logger; | ||
|
@@ -533,13 +535,11 @@ async Task OnStreamCanceled(InvocationRequest irq) | |
} | ||
|
||
LaunchStreams(readers, cancellationToken); | ||
|
||
return channel; | ||
} | ||
|
||
private Dictionary<string, object> PackageStreamingParams(ref object[] args, out List<string> streamIds) | ||
{ | ||
// lazy initialized, to avoid allocating unecessary dictionaries | ||
Dictionary<string, object> readers = null; | ||
streamIds = null; | ||
var newArgs = new List<object>(args.Length); | ||
|
@@ -572,7 +572,6 @@ private Dictionary<string, object> PackageStreamingParams(ref object[] args, out | |
} | ||
|
||
args = newArgs.ToArray(); | ||
|
||
return readers; | ||
} | ||
|
||
|
@@ -590,31 +589,69 @@ private void LaunchStreams(Dictionary<string, object> readers, CancellationToken | |
// For each stream that needs to be sent, run a "send items" task in the background. | ||
// This reads from the channel, attaches streamId, and sends to server. | ||
// A single background thread here quickly gets messy. | ||
#if NETCOREAPP3_0 | ||
if (ReflectionHelper.IsIAsyncEnumerable(reader.GetType())) | ||
{ | ||
_ = _sendIAsyncStreamItemsMethod | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Long term, we should store the Takss returned by _sendIAsyncStreamItemsMethod and _sendStreamItemsMethod and track them during connection teardown and log warnings if they don't complete within a timeout. |
||
.MakeGenericMethod(reader.GetType().GetInterface("IAsyncEnumerable`1").GetGenericArguments()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mikaelm12 is this right? |
||
.Invoke(this, new object[] { kvp.Key.ToString(), reader, cancellationToken }); | ||
continue; | ||
} | ||
#endif | ||
_ = _sendStreamItemsMethod | ||
.MakeGenericMethod(reader.GetType().GetGenericArguments()) | ||
.Invoke(this, new object[] { kvp.Key.ToString(), reader, cancellationToken }); | ||
} | ||
} | ||
|
||
// this is called via reflection using the `_sendStreamItems` field | ||
private async Task SendStreamItems<T>(string streamId, ChannelReader<T> reader, CancellationToken token) | ||
private Task SendStreamItems<T>(string streamId, ChannelReader<T> reader, CancellationToken token) | ||
{ | ||
Log.StartingStream(_logger, streamId); | ||
|
||
var combinedToken = CancellationTokenSource.CreateLinkedTokenSource(_uploadStreamToken, token).Token; | ||
|
||
string responseError = null; | ||
try | ||
async Task ReadChannelStream(CancellationTokenSource tokenSource) | ||
{ | ||
while (await reader.WaitToReadAsync(combinedToken)) | ||
while (await reader.WaitToReadAsync(token)) | ||
mikaelm12 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
while (!combinedToken.IsCancellationRequested && reader.TryRead(out var item)) | ||
while (!token.IsCancellationRequested && reader.TryRead(out var item)) | ||
{ | ||
await SendWithLock(new StreamItemMessage(streamId, item)); | ||
Log.SendingStreamItem(_logger, streamId); | ||
} | ||
} | ||
} | ||
|
||
return CommonStreaming(streamId, token, ReadChannelStream); | ||
} | ||
|
||
#if NETCOREAPP3_0 | ||
// this is called via reflection using the `_sendIAsyncStreamItemsMethod` field | ||
private Task SendIAsyncEnumerableStreamItems<T>(string streamId, IAsyncEnumerable<T> stream, CancellationToken token) | ||
{ | ||
async Task ReadAsyncEnumerableStream(CancellationTokenSource tokenSource) | ||
{ | ||
var streamValues = AsyncEnumerableAdapters.MakeCancelableTypedAsyncEnumerable(stream, tokenSource); | ||
|
||
await foreach (var streamValue in streamValues) | ||
{ | ||
await SendWithLock(new StreamItemMessage(streamId, streamValue)); | ||
Log.SendingStreamItem(_logger, streamId); | ||
} | ||
} | ||
|
||
return CommonStreaming(streamId, token, ReadAsyncEnumerableStream); | ||
} | ||
#endif | ||
|
||
private async Task CommonStreaming(string streamId, CancellationToken token, Func<CancellationTokenSource, Task> createAndConsumeStream) | ||
{ | ||
var cts = CancellationTokenSource.CreateLinkedTokenSource(_uploadStreamToken, token); | ||
|
||
Log.StartingStream(_logger, streamId); | ||
string responseError = null; | ||
try | ||
{ | ||
await createAndConsumeStream(cts); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
Log.CancelingStream(_logger, streamId); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -661,6 +661,106 @@ public async Task CanStreamToAndFromClientInSameInvocation(string protocolName, | |
} | ||
} | ||
|
||
[Theory] | ||
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))] | ||
[LogLevel(LogLevel.Trace)] | ||
public async Task CanStreamToServerWithIAsyncEnumerable(string protocolName, HttpTransportType transportType, string path) | ||
{ | ||
var protocol = HubProtocols[protocolName]; | ||
using (StartServer<Startup>(out var server)) | ||
{ | ||
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory); | ||
try | ||
{ | ||
async IAsyncEnumerable<string> clientStreamData() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dude, casing, 😄 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the case settled on local functions? I could see the argument for using camelCase like you would for anything else with a local scope. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes it settled it’s always pascal case because this is c# not java or JavaScript There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😅 |
||
{ | ||
var items = new string[] { "A", "B", "C", "D" }; | ||
foreach (var item in items) | ||
{ | ||
await Task.Delay(10); | ||
yield return item; | ||
} | ||
} | ||
|
||
await connection.StartAsync().OrTimeout(); | ||
|
||
var stream = clientStreamData(); | ||
|
||
var channel = await connection.StreamAsChannelAsync<string>("StreamEcho", stream).OrTimeout(); | ||
|
||
Assert.Equal("A", await channel.ReadAsync().AsTask().OrTimeout()); | ||
Assert.Equal("B", await channel.ReadAsync().AsTask().OrTimeout()); | ||
Assert.Equal("C", await channel.ReadAsync().AsTask().OrTimeout()); | ||
Assert.Equal("D", await channel.ReadAsync().AsTask().OrTimeout()); | ||
|
||
var results = await channel.ReadAndCollectAllAsync().OrTimeout(); | ||
Assert.Empty(results); | ||
} | ||
catch (Exception ex) | ||
{ | ||
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName); | ||
throw; | ||
} | ||
finally | ||
{ | ||
await connection.DisposeAsync().OrTimeout(); | ||
} | ||
} | ||
} | ||
|
||
[Theory] | ||
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))] | ||
[LogLevel(LogLevel.Trace)] | ||
public async Task CanCancelIAsyncEnumerableClientToServerUpload(string protocolName, HttpTransportType transportType, string path) | ||
{ | ||
var protocol = HubProtocols[protocolName]; | ||
using (StartServer<Startup>(out var server)) | ||
{ | ||
var connection = CreateHubConnection(server.Url, path, transportType, protocol, LoggerFactory); | ||
try | ||
{ | ||
async IAsyncEnumerable<int> clientStreamData() | ||
{ | ||
for (var i = 0; i < 1000; i++) | ||
{ | ||
yield return i; | ||
await Task.Delay(10); | ||
} | ||
} | ||
|
||
await connection.StartAsync().OrTimeout(); | ||
var results = new List<int>(); | ||
var stream = clientStreamData(); | ||
var cts = new CancellationTokenSource(); | ||
var ex = await Assert.ThrowsAsync<OperationCanceledException>(async () => | ||
{ | ||
var channel = await connection.StreamAsChannelAsync<int>("StreamEchoInt", stream, cts.Token).OrTimeout(); | ||
|
||
while (await channel.WaitToReadAsync()) | ||
{ | ||
while (channel.TryRead(out var item)) | ||
{ | ||
results.Add(item); | ||
cts.Cancel(); | ||
} | ||
} | ||
}); | ||
|
||
Assert.True(results.Count > 0 && results.Count < 1000); | ||
Assert.True(cts.IsCancellationRequested); | ||
} | ||
catch (Exception ex) | ||
{ | ||
LoggerFactory.CreateLogger<HubConnectionTests>().LogError(ex, "{ExceptionType} from test", ex.GetType().FullName); | ||
throw; | ||
} | ||
finally | ||
{ | ||
await connection.DisposeAsync().OrTimeout(); | ||
} | ||
} | ||
} | ||
|
||
[Theory] | ||
[MemberData(nameof(HubProtocolsAndTransportsAndHubPaths))] | ||
[LogLevel(LogLevel.Trace)] | ||
|
@@ -673,7 +773,7 @@ public async Task StreamAsyncCanBeCanceledThroughGetAsyncEnumerator(string proto | |
try | ||
{ | ||
await connection.StartAsync().OrTimeout(); | ||
var stream = connection.StreamAsync<int>("Stream", 1000 ); | ||
var stream = connection.StreamAsync<int>("Stream", 1000); | ||
var results = new List<int>(); | ||
|
||
var cts = new CancellationTokenSource(); | ||
|
Uh oh!
There was an error while loading. Please reload this page.