-
Notifications
You must be signed in to change notification settings - Fork 10.4k
Blazor Streaming Interop | JS
to DotNet
#33491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1d32e1e
66dbaf5
2509ce5
50bcb31
5b69105
519f854
b35027b
46f4ddf
c77fd3e
e1635e6
1b76862
c97360c
b266fba
7133bd6
e3956fe
639b34f
7f89312
115d935
1e01c62
80a2171
6a931ad
e799997
9296a36
e07686d
62f2338
e7f44a2
12f7b89
e37413b
eafd3f8
903b16d
0bb3e6c
381c53d
6e09532
350f4de
b5615c7
53f0392
3cd7619
d549784
7589cd0
9e63bb6
f565366
1b8ed19
c1e0873
6e91d84
20e8489
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.IO; | ||
using System.IO.Pipelines; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.JSInterop; | ||
|
||
namespace Microsoft.AspNetCore.Components.Server.Circuits | ||
{ | ||
internal sealed class RemoteJSDataStream : Stream | ||
{ | ||
private readonly RemoteJSRuntime _runtime; | ||
private readonly long _streamId; | ||
private readonly long _totalLength; | ||
private readonly TimeSpan _jsInteropDefaultCallTimeout; | ||
private readonly CancellationToken _streamCancellationToken; | ||
private readonly Stream _pipeReaderStream; | ||
private readonly Pipe _pipe; | ||
private long _bytesRead; | ||
private long _expectedChunkId; | ||
private DateTimeOffset _lastDataReceivedTime; | ||
private bool _disposed; | ||
|
||
public static async Task<bool> ReceiveData(RemoteJSRuntime runtime, long streamId, long chunkId, byte[] chunk, string error) | ||
{ | ||
if (!runtime.RemoteJSDataStreamInstances.TryGetValue(streamId, out var instance)) | ||
{ | ||
// There is no data stream with the given identifier. It may have already been disposed. | ||
// We notify JS that the stream has been cancelled/disposed. | ||
return false; | ||
} | ||
|
||
return await instance.ReceiveData(chunkId, chunk, error); | ||
} | ||
|
||
public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync( | ||
RemoteJSRuntime runtime, | ||
IJSStreamReference jsStreamReference, | ||
long totalLength, | ||
long maxBufferSize, | ||
long maximumIncomingBytes, | ||
TimeSpan jsInteropDefaultCallTimeout, | ||
CancellationToken cancellationToken = default) | ||
{ | ||
// Enforce minimum 1 kb, maximum 50 kb, SignalR message size. | ||
// We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data | ||
// transfer per chunk with a 1 kb message size. | ||
// Additionally, to maintain interactivity, we put an upper limit of 50 kb on the message size. | ||
var chunkSize = maximumIncomingBytes > 1024 ? | ||
Math.Min(maximumIncomingBytes, 50*1024) - 512 : | ||
throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb."); | ||
|
||
var streamId = runtime.RemoteJSDataStreamNextInstanceId++; | ||
TanayParikh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, maxBufferSize, jsInteropDefaultCallTimeout, cancellationToken); | ||
await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize); | ||
return remoteJSDataStream; | ||
} | ||
|
||
private RemoteJSDataStream( | ||
RemoteJSRuntime runtime, | ||
long streamId, | ||
long totalLength, | ||
long maxBufferSize, | ||
TimeSpan jsInteropDefaultCallTimeout, | ||
CancellationToken cancellationToken) | ||
{ | ||
_runtime = runtime; | ||
_streamId = streamId; | ||
_totalLength = totalLength; | ||
_jsInteropDefaultCallTimeout = jsInteropDefaultCallTimeout; | ||
_streamCancellationToken = cancellationToken; | ||
|
||
_lastDataReceivedTime = DateTimeOffset.UtcNow; | ||
_ = ThrowOnTimeout(); | ||
|
||
_runtime.RemoteJSDataStreamInstances.Add(_streamId, this); | ||
|
||
_pipe = new Pipe(new PipeOptions(pauseWriterThreshold: maxBufferSize, resumeWriterThreshold: maxBufferSize / 2)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @davidfowl since you are the pipelines expert, can you review our usage in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From @davidfowl
|
||
_pipeReaderStream = _pipe.Reader.AsStream(); | ||
} | ||
|
||
private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error) | ||
{ | ||
try | ||
{ | ||
_lastDataReceivedTime = DateTimeOffset.UtcNow; | ||
_ = ThrowOnTimeout(); | ||
|
||
if (!string.IsNullOrEmpty(error)) | ||
{ | ||
throw new InvalidOperationException($"An error occurred while reading the remote stream: {error}"); | ||
} | ||
|
||
if (chunkId != _expectedChunkId) | ||
{ | ||
throw new EndOfStreamException($"Out of sequence chunk received, expected {_expectedChunkId}, but received {chunkId}."); | ||
} | ||
|
||
++_expectedChunkId; | ||
|
||
if (chunk.Length == 0) | ||
{ | ||
throw new EndOfStreamException($"The incoming data chunk cannot be empty."); | ||
} | ||
|
||
_bytesRead += chunk.Length; | ||
|
||
if (_bytesRead > _totalLength) | ||
{ | ||
throw new EndOfStreamException($"The incoming data stream declared a length {_totalLength}, but {_bytesRead} bytes were sent."); | ||
} | ||
|
||
await _pipe.Writer.WriteAsync(chunk, _streamCancellationToken); | ||
|
||
if (_bytesRead == _totalLength) | ||
{ | ||
await CompletePipeAndDisposeStream(); | ||
} | ||
|
||
return true; | ||
TanayParikh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
catch (Exception e) | ||
{ | ||
await CompletePipeAndDisposeStream(e); | ||
|
||
// Fatal exception, crush the circuit. A well behaved client | ||
// should not result in this type of exception. | ||
if (e is EndOfStreamException) | ||
{ | ||
throw; | ||
} | ||
|
||
return false; | ||
} | ||
} | ||
|
||
public override bool CanRead => true; | ||
|
||
public override bool CanSeek => false; | ||
|
||
public override bool CanWrite => false; | ||
|
||
public override long Length => _totalLength; | ||
|
||
public override long Position | ||
{ | ||
get => _pipeReaderStream.Position; | ||
set => throw new NotSupportedException(); | ||
} | ||
|
||
public override void Flush() | ||
=> throw new NotSupportedException(); | ||
|
||
public override int Read(byte[] buffer, int offset, int count) | ||
=> throw new NotSupportedException("Synchronous reads are not supported."); | ||
|
||
public override long Seek(long offset, SeekOrigin origin) | ||
=> throw new NotSupportedException(); | ||
|
||
public override void SetLength(long value) | ||
=> throw new NotSupportedException(); | ||
|
||
public override void Write(byte[] buffer, int offset, int count) | ||
=> throw new NotSupportedException(); | ||
|
||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) | ||
{ | ||
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken); | ||
return await _pipeReaderStream.ReadAsync(buffer.AsMemory(offset, count), linkedCancellationToken); | ||
} | ||
|
||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) | ||
{ | ||
var linkedCancellationToken = GetLinkedCancellationToken(_streamCancellationToken, cancellationToken); | ||
return await _pipeReaderStream.ReadAsync(buffer, linkedCancellationToken); | ||
} | ||
|
||
private static CancellationToken GetLinkedCancellationToken(CancellationToken a, CancellationToken b) | ||
{ | ||
if (a.CanBeCanceled && b.CanBeCanceled) | ||
{ | ||
return CancellationTokenSource.CreateLinkedTokenSource(a, b).Token; | ||
} | ||
else if (a.CanBeCanceled) | ||
{ | ||
return a; | ||
} | ||
|
||
return b; | ||
} | ||
|
||
private async Task ThrowOnTimeout() | ||
{ | ||
await Task.Delay(_jsInteropDefaultCallTimeout); | ||
|
||
if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout))) | ||
{ | ||
// Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout. | ||
var timeoutException = new TimeoutException("Did not receive any data in the alloted time."); | ||
await CompletePipeAndDisposeStream(timeoutException); | ||
_runtime.RaiseUnhandledException(timeoutException); | ||
} | ||
} | ||
|
||
internal async Task CompletePipeAndDisposeStream(Exception? ex = null) | ||
{ | ||
await _pipe.Writer.CompleteAsync(ex); | ||
Dispose(true); | ||
} | ||
|
||
protected override void Dispose(bool disposing) | ||
{ | ||
if (disposing) | ||
{ | ||
_runtime.RemoteJSDataStreamInstances.Remove(_streamId); | ||
} | ||
|
||
_disposed = true; | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.