Skip to content

Commit 689a08f

Browse files
authored
Blazor Streaming Interop | JS to DotNet (#33491)
* Streaming Interop (WIP) * Cancel JS Data Stream (with clearTimeout) * Cancel without clearTimeout * Determine chunk size based on SignalR limit * Add DeserializedJSObjectReferenceValues * Update JSObjectReference.cs * Move RemoteJSDataStream Instances to RemoteJSRuntime * Cancel using SupplyJSDataChunk * Update blazor.server.js * PR Feedback * JSCallResultType JSDataReference * Update PublicAPI.Unshipped.txt * PR Feedback Continued * Unit Tests * @pranavkm feedback * @BrennanConroy PR Feedback * E2E JS To .NET Interop Test * Update blazor.server.js * Update InteropComponent.razor * Make LinkedCTS More Efficient * Update RemoteJSDataStream.cs * Revert SignalR Changes * PR Feedback * E2E tests * JSDataReference->JSStreamReference * PR Feedback * 1 Minutes Inactivity Timeout * Integrate chunk sequence validation for SignalR Disconnect * Update ReceiveDataTimeout * Update RemoteJSDataStream.cs * ReceiveData Timeout UnhandledException * Improve test reliability * PR Feedback * Update RemoteJSDataStreamTest.cs * Update blazor.*.js * Cleanup & Fix Test
1 parent cd5e80f commit 689a08f

32 files changed

+1114
-121
lines changed

src/Components/Server/src/Circuits/CircuitHost.cs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Buffers;
56
using System.Collections.Generic;
67
using System.Globalization;
78
using System.Security.Claims;
@@ -67,8 +68,11 @@ public CircuitHost(
6768
Circuit = new Circuit(this);
6869
Handle = new CircuitHandle() { CircuitHost = this, };
6970

70-
Renderer.UnhandledException += Renderer_UnhandledException;
71+
// An unhandled exception from the renderer is always fatal because it came from user code.
72+
Renderer.UnhandledException += ReportAndInvoke_UnhandledException;
7173
Renderer.UnhandledSynchronizationException += SynchronizationContext_UnhandledException;
74+
75+
JSRuntime.UnhandledException += ReportAndInvoke_UnhandledException;
7276
}
7377

7478
public CircuitHandle Handle { get; }
@@ -415,6 +419,31 @@ await Renderer.Dispatcher.InvokeAsync(() =>
415419
}
416420
}
417421

422+
// ReceiveJSDataChunk is used in a fire-and-forget context, so it's responsible for its own
423+
// error handling.
424+
internal async Task<bool> ReceiveJSDataChunk(long streamId, long chunkId, byte[] chunk, string error)
425+
{
426+
AssertInitialized();
427+
AssertNotDisposed();
428+
429+
try
430+
{
431+
return await Renderer.Dispatcher.InvokeAsync(() =>
432+
{
433+
return RemoteJSDataStream.ReceiveData(JSRuntime, streamId, chunkId, chunk, error);
434+
});
435+
}
436+
catch (Exception ex)
437+
{
438+
// An error completing JS interop means that the user sent invalid data, a well-behaved
439+
// client won't do this.
440+
Log.ReceiveJSDataChunkException(_logger, streamId, ex);
441+
await TryNotifyClientErrorAsync(Client, GetClientErrorMessage(ex, "Invalid chunk supplied to stream."));
442+
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(ex, isTerminating: false));
443+
return false;
444+
}
445+
}
446+
418447
// DispatchEvent is used in a fire-and-forget context, so it's responsible for its own
419448
// error handling.
420449
public async Task DispatchEvent(string eventDescriptorJson, string eventArgsJson)
@@ -548,9 +577,8 @@ private void AssertNotDisposed()
548577
}
549578
}
550579

551-
// An unhandled exception from the renderer is always fatal because it came from user code.
552580
// We want to notify the client if it's still connected, and then tear-down the circuit.
553-
private async void Renderer_UnhandledException(object sender, Exception e)
581+
private async void ReportAndInvoke_UnhandledException(object sender, Exception e)
554582
{
555583
await ReportUnhandledException(e);
556584
UnhandledException?.Invoke(this, new UnhandledExceptionEventArgs(e, isTerminating: false));
@@ -638,6 +666,7 @@ private static class Log
638666
private static readonly Action<ILogger, long, Exception> _endInvokeJSSucceeded;
639667
private static readonly Action<ILogger, long, Exception> _receiveByteArraySuccess;
640668
private static readonly Action<ILogger, long, Exception> _receiveByteArrayException;
669+
private static readonly Action<ILogger, long, Exception> _receiveJSDataChunkException;
641670
private static readonly Action<ILogger, Exception> _dispatchEventFailedToParseEventData;
642671
private static readonly Action<ILogger, string, Exception> _dispatchEventFailedToDispatchEvent;
643672
private static readonly Action<ILogger, string, CircuitId, Exception> _locationChange;
@@ -682,6 +711,7 @@ private static class EventIds
682711
public static readonly EventId OnRenderCompletedFailed = new EventId(212, "OnRenderCompletedFailed");
683712
public static readonly EventId ReceiveByteArraySucceeded = new EventId(213, "ReceiveByteArraySucceeded");
684713
public static readonly EventId ReceiveByteArrayException = new EventId(214, "ReceiveByteArrayException");
714+
public static readonly EventId ReceiveJSDataChunkException = new EventId(215, "ReceiveJSDataChunkException");
685715
}
686716

687717
static Log()
@@ -811,6 +841,11 @@ static Log()
811841
EventIds.ReceiveByteArrayException,
812842
"The ReceiveByteArray call with id '{id}' failed.");
813843

844+
_receiveJSDataChunkException = LoggerMessage.Define<long>(
845+
LogLevel.Debug,
846+
EventIds.ReceiveJSDataChunkException,
847+
"The ReceiveJSDataChunk call with stream id '{streamId}' failed.");
848+
814849
_dispatchEventFailedToParseEventData = LoggerMessage.Define(
815850
LogLevel.Debug,
816851
EventIds.DispatchEventFailedToParseEventData,
@@ -875,6 +910,7 @@ public static void CircuitHandlerFailed(ILogger logger, CircuitHandler handler,
875910
public static void EndInvokeJSSucceeded(ILogger logger, long asyncCall) => _endInvokeJSSucceeded(logger, asyncCall, null);
876911
internal static void ReceiveByteArraySuccess(ILogger logger, long id) => _receiveByteArraySuccess(logger, id, null);
877912
internal static void ReceiveByteArrayException(ILogger logger, long id, Exception ex) => _receiveByteArrayException(logger, id, ex);
913+
internal static void ReceiveJSDataChunkException(ILogger logger, long streamId, Exception ex) => _receiveJSDataChunkException(logger, streamId, ex);
878914
public static void DispatchEventFailedToParseEventData(ILogger logger, Exception ex) => _dispatchEventFailedToParseEventData(logger, ex);
879915
public static void DispatchEventFailedToDispatchEvent(ILogger logger, string eventHandlerId, Exception ex) => _dispatchEventFailedToDispatchEvent(logger, eventHandlerId ?? "", ex);
880916

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.IO;
6+
using System.IO.Pipelines;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.JSInterop;
10+
11+
namespace Microsoft.AspNetCore.Components.Server.Circuits
12+
{
13+
internal sealed class RemoteJSDataStream : Stream
14+
{
15+
private readonly RemoteJSRuntime _runtime;
16+
private readonly long _streamId;
17+
private readonly long _totalLength;
18+
private readonly TimeSpan _jsInteropDefaultCallTimeout;
19+
private readonly CancellationToken _streamCancellationToken;
20+
private readonly Stream _pipeReaderStream;
21+
private readonly Pipe _pipe;
22+
private long _bytesRead;
23+
private long _expectedChunkId;
24+
private DateTimeOffset _lastDataReceivedTime;
25+
private bool _disposed;
26+
27+
public static async Task<bool> ReceiveData(RemoteJSRuntime runtime, long streamId, long chunkId, byte[] chunk, string error)
28+
{
29+
if (!runtime.RemoteJSDataStreamInstances.TryGetValue(streamId, out var instance))
30+
{
31+
// There is no data stream with the given identifier. It may have already been disposed.
32+
// We notify JS that the stream has been cancelled/disposed.
33+
return false;
34+
}
35+
36+
return await instance.ReceiveData(chunkId, chunk, error);
37+
}
38+
39+
public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(
40+
RemoteJSRuntime runtime,
41+
IJSStreamReference jsStreamReference,
42+
long totalLength,
43+
long maxBufferSize,
44+
long maximumIncomingBytes,
45+
TimeSpan jsInteropDefaultCallTimeout,
46+
CancellationToken cancellationToken = default)
47+
{
48+
// Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
49+
// We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data
50+
// transfer per chunk with a 1 kb message size.
51+
// Additionally, to maintain interactivity, we put an upper limit of 50 kb on the message size.
52+
var chunkSize = maximumIncomingBytes > 1024 ?
53+
Math.Min(maximumIncomingBytes, 50*1024) - 512 :
54+
throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");
55+
56+
var streamId = runtime.RemoteJSDataStreamNextInstanceId++;
57+
var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, jsInteropDefaultCallTimeout, cancellationToken);
58+
await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize);
59+
return remoteJSDataStream;
60+
}
61+
62+
private RemoteJSDataStream(
63+
RemoteJSRuntime runtime,
64+
long streamId,
65+
long totalLength,
66+
long maxBufferSize,
67+
TimeSpan jsInteropDefaultCallTimeout,
68+
CancellationToken cancellationToken)
69+
{
70+
_runtime = runtime;
71+
_streamId = streamId;
72+
_totalLength = totalLength;
73+
_jsInteropDefaultCallTimeout = jsInteropDefaultCallTimeout;
74+
_streamCancellationToken = cancellationToken;
75+
76+
_lastDataReceivedTime = DateTimeOffset.UtcNow;
77+
_ = ThrowOnTimeout();
78+
79+
_runtime.RemoteJSDataStreamInstances.Add(_streamId, this);
80+
81+
_pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2));
82+
_pipeReaderStream = _pipe.Reader.AsStream();
83+
}
84+
85+
private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
86+
{
87+
try
88+
{
89+
_lastDataReceivedTime = DateTimeOffset.UtcNow;
90+
_ = ThrowOnTimeout();
91+
92+
if (!string.IsNullOrEmpty(error))
93+
{
94+
throw new InvalidOperationException($"An error occurred while reading the remote stream: {error}");
95+
}
96+
97+
if (chunkId != _expectedChunkId)
98+
{
99+
throw new EndOfStreamException($"Out of sequence chunk received, expected {_expectedChunkId}, but received {chunkId}.");
100+
}
101+
102+
++_expectedChunkId;
103+
104+
if (chunk.Length == 0)
105+
{
106+
throw new EndOfStreamException($"The incoming data chunk cannot be empty.");
107+
}
108+
109+
_bytesRead += chunk.Length;
110+
111+
if (_bytesRead > _totalLength)
112+
{
113+
throw new EndOfStreamException($"The incoming data stream declared a length {_totalLength}, but {_bytesRead} bytes were sent.");
114+
}
115+
116+
await _pipe.Writer.WriteAsync(chunk, _streamCancellationToken);
117+
118+
if (_bytesRead == _totalLength)
119+
{
120+
await CompletePipeAndDisposeStream();
121+
}
122+
123+
return true;
124+
}
125+
catch (Exception e)
126+
{
127+
await CompletePipeAndDisposeStream(e);
128+
129+
// Fatal exception, crush the circuit. A well behaved client
130+
// should not result in this type of exception.
131+
if (e is EndOfStreamException)
132+
{
133+
throw;
134+
}
135+
136+
return false;
137+
}
138+
}
139+
140+
public override bool CanRead => true;
141+
142+
public override bool CanSeek => false;
143+
144+
public override bool CanWrite => false;
145+
146+
public override long Length => _totalLength;
147+
148+
public override long Position
149+
{
150+
get => _pipeReaderStream.Position;
151+
set => throw new NotSupportedException();
152+
}
153+
154+
public override void Flush()
155+
=> throw new NotSupportedException();
156+
157+
public override int Read(byte[] buffer, int offset, int count)
158+
=> throw new NotSupportedException("Synchronous reads are not supported.");
159+
160+
public override long Seek(long offset, SeekOrigin origin)
161+
=> throw new NotSupportedException();
162+
163+
public override void SetLength(long value)
164+
=> throw new NotSupportedException();
165+
166+
public override void Write(byte[] buffer, int offset, int count)
167+
=> throw new NotSupportedException();
168+
169+
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
170+
{
171+
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken);
172+
return await _pipeReaderStream.ReadAsync(buffer.AsMemory(offset, count), linkedCancellationToken);
173+
}
174+
175+
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
176+
{
177+
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken);
178+
return await _pipeReaderStream.ReadAsync(buffer, linkedCancellationToken);
179+
}
180+
181+
private static CancellationToken GetLinkedCancellationToken(CancellationToken a, CancellationToken b)
182+
{
183+
if (a.CanBeCanceled && b.CanBeCanceled)
184+
{
185+
return CancellationTokenSource.CreateLinkedTokenSource(a, b).Token;
186+
}
187+
else if (a.CanBeCanceled)
188+
{
189+
return a;
190+
}
191+
192+
return b;
193+
}
194+
195+
private async Task ThrowOnTimeout()
196+
{
197+
await Task.Delay(_jsInteropDefaultCallTimeout);
198+
199+
if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
200+
{
201+
// Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
202+
var timeoutException = new TimeoutException("Did not receive any data in the alloted time.");
203+
await CompletePipeAndDisposeStream(timeoutException);
204+
_runtime.RaiseUnhandledException(timeoutException);
205+
}
206+
}
207+
208+
internal async Task CompletePipeAndDisposeStream(Exception? ex = null)
209+
{
210+
await _pipe.Writer.CompleteAsync(ex);
211+
Dispose(true);
212+
}
213+
214+
protected override void Dispose(bool disposing)
215+
{
216+
if (disposing)
217+
{
218+
_runtime.RemoteJSDataStreamInstances.Remove(_streamId);
219+
}
220+
221+
_disposed = true;
222+
}
223+
}
224+
}

src/Components/Server/src/Circuits/RemoteJSRuntime.cs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.Collections.Generic;
6+
using System.IO;
57
using System.Text.Json;
8+
using System.Threading;
9+
using System.Threading.Tasks;
610
using Microsoft.AspNetCore.SignalR;
711
using Microsoft.Extensions.Logging;
812
using Microsoft.Extensions.Options;
@@ -20,10 +24,18 @@ internal class RemoteJSRuntime : JSRuntime
2024
private readonly long _maximumIncomingBytes;
2125
private int _byteArraysToBeRevivedTotalBytes;
2226

27+
internal int RemoteJSDataStreamNextInstanceId;
28+
internal readonly Dictionary<long, RemoteJSDataStream> RemoteJSDataStreamInstances = new();
29+
2330
public ElementReferenceContext ElementReferenceContext { get; }
2431

2532
public bool IsInitialized => _clientProxy is not null;
2633

34+
/// <summary>
35+
/// Notifies when a runtime exception occurred.
36+
/// </summary>
37+
public event EventHandler<Exception>? UnhandledException;
38+
2739
public RemoteJSRuntime(
2840
IOptions<CircuitOptions> circuitOptions,
2941
IOptions<HubOptions> hubOptions,
@@ -46,6 +58,11 @@ internal void Initialize(CircuitClientProxy clientProxy)
4658
_clientProxy = clientProxy ?? throw new ArgumentNullException(nameof(clientProxy));
4759
}
4860

61+
internal void RaiseUnhandledException(Exception ex)
62+
{
63+
UnhandledException?.Invoke(this, ex);
64+
}
65+
4966
protected override void EndInvokeDotNet(DotNetInvocationInfo invocationInfo, in DotNetInvocationResult invocationResult)
5067
{
5168
if (!invocationResult.Success)
@@ -140,6 +157,9 @@ public void MarkPermanentlyDisconnected()
140157
_clientProxy = null;
141158
}
142159

160+
protected override async Task<Stream> ReadJSDataAsStreamAsync(IJSStreamReference jsStreamReference, long totalLength, long maxBufferSize, CancellationToken cancellationToken)
161+
=> await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(this, jsStreamReference, totalLength, maxBufferSize, _maximumIncomingBytes, _options.JSInteropDefaultCallTimeout, cancellationToken);
162+
143163
public static class Log
144164
{
145165
private static readonly Action<ILogger, long, string, Exception> _beginInvokeJS =
@@ -198,7 +218,6 @@ internal static void InvokeDotNetMethodSuccess(ILogger<RemoteJSRuntime> logger,
198218
{
199219
_invokeInstanceDotNetMethodSuccess(logger, invocationInfo.MethodIdentifier, invocationInfo.DotNetObjectId, invocationInfo.CallId, null);
200220
}
201-
202221
}
203222
}
204223
}

0 commit comments

Comments
 (0)