Skip to content

Commit 6e91d84

Browse files
committed
Cleanup & Fix Test
1 parent c1e0873 commit 6e91d84

File tree

4 files changed

+63
-19
lines changed

4 files changed

+63
-19
lines changed

src/Components/Server/src/Circuits/RemoteJSDataStream.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(
4545
TimeSpan jsInteropDefaultCallTimeout,
4646
CancellationToken cancellationToken = default)
4747
{
48-
// Enforce minimum 1 kb SignalR message size as we budget 512 bytes
49-
// overhead for the transfer, thus leaving at least 512 bytes for data
50-
// transfer per chunk.
48+
// Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
49+
// We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data
50+
// transfer per chunk with a 1 kb message size.
51+
// Additionally, to maintain interactivity, we put an upper limit of 50 kb on the message size.
5152
var chunkSize = maximumIncomingBytes > 1024 ?
5253
Math.Min(maximumIncomingBytes, 50*1024) - 512 :
5354
throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");
@@ -72,7 +73,7 @@ private RemoteJSDataStream(
7273
_jsInteropDefaultCallTimeout = jsInteropDefaultCallTimeout;
7374
_streamCancellationToken = cancellationToken;
7475

75-
_lastDataReceivedTime = DateTime.UtcNow;
76+
_lastDataReceivedTime = DateTimeOffset.UtcNow;
7677
_ = ThrowOnTimeout();
7778

7879
_runtime.RemoteJSDataStreamInstances.Add(_streamId, this);
@@ -85,7 +86,7 @@ private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
8586
{
8687
try
8788
{
88-
_lastDataReceivedTime = DateTime.UtcNow;
89+
_lastDataReceivedTime = DateTimeOffset.UtcNow;
8990
_ = ThrowOnTimeout();
9091

9192
if (!string.IsNullOrEmpty(error))
@@ -195,7 +196,7 @@ private async Task ThrowOnTimeout()
195196
{
196197
await Task.Delay(_jsInteropDefaultCallTimeout);
197198

198-
if (!_disposed && (DateTime.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
199+
if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
199200
{
200201
// Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
201202
var timeoutException = new TimeoutException("Did not receive any data in the alloted time.");

src/Components/Server/src/ComponentHub.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ public async ValueTask<bool> ReceiveJSDataChunk(long streamId, long chunkId, byt
232232
return false;
233233
}
234234

235+
// Note: this await will block the circuit. This is intentional.
236+
// The call into the circuitHost.ReceiveJSDataChunk will block regardless as we call into Renderer.Dispatcher.InvokeAsync
237+
// which ensures we're running on the main circuit thread so that the server/client remain in the same
238+
// synchronization context. Additionally, we're utilizing the return value as a heartbeat for the transfer
239+
// process, and without it would likely need to setup a separate endpoint to handle that functionality.
235240
return await circuitHost.ReceiveJSDataChunk(streamId, chunkId, chunk, error);
236241
}
237242

src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,38 +74,78 @@ public async void ReceiveData_SuccessReadsBackStream()
7474
}
7575

7676
[Fact]
77-
public async void ReceiveData_ReceiveDataTimeout_StreamDisposed()
77+
public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed()
7878
{
7979
// Arrange
8080
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
81+
var timeoutExceptionRaised = false;
82+
jsRuntime.UnhandledException += (_, ex) =>
83+
{
84+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
85+
timeoutExceptionRaised = ex is TimeoutException;
86+
};
87+
8188
var jsStreamReference = Mock.Of<IJSStreamReference>();
8289
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
83-
jsRuntime ?? _jsRuntime,
90+
jsRuntime,
8491
jsStreamReference,
8592
totalLength: 9,
8693
maxBufferSize: 50,
8794
maximumIncomingBytes: 10_000,
88-
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(40)); // Note we're using a 40 second timeout for this test
95+
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(10)); // Note we're using a 10 second timeout for this test
96+
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
97+
var chunk = new byte[] { 3, 5, 7 };
98+
99+
// Act & Assert 1
100+
// Wait past the initial timeout + 15 sec buffer room and then
101+
// confirm unhandled exception raised to crush circuit
102+
await Task.Delay(TimeSpan.FromSeconds(25));
103+
Assert.True(timeoutExceptionRaised);
104+
105+
// Act & Assert 2
106+
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
107+
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
108+
Assert.False(success);
109+
}
110+
111+
[Fact]
112+
public async void ReceiveData_ReceivesDataThenTimesout_StreamDisposed()
113+
{
114+
// Arrange
115+
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
116+
var timeoutExceptionRaised = false;
117+
jsRuntime.UnhandledException += (_, ex) =>
118+
{
119+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
120+
timeoutExceptionRaised = ex is TimeoutException;
121+
};
122+
123+
var jsStreamReference = Mock.Of<IJSStreamReference>();
124+
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
125+
jsRuntime,
126+
jsStreamReference,
127+
totalLength: 9,
128+
maxBufferSize: 50,
129+
maximumIncomingBytes: 10_000,
130+
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(30)); // Note we're using a 30 second timeout for this test
89131
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
90132
var chunk = new byte[] { 3, 5, 7 };
91133

92134
// Act & Assert 1
93135
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
94136
Assert.True(success);
95137

96-
await Task.Delay(TimeSpan.FromSeconds(20));
138+
await Task.Delay(TimeSpan.FromSeconds(15));
97139

98140
// Act & Assert 2
99141
success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null);
100142
Assert.True(success);
101143

102-
// Wait 80 seconds (20 sec between first two calls + 40 sec timeout + 20 sec buffer room)
103-
await Task.Delay(TimeSpan.FromSeconds(80));
104-
105144
// Act & Assert 3
106-
using var mem = new MemoryStream();
107-
var ex = await Assert.ThrowsAsync<TimeoutException>(async() => await remoteJSDataStream.CopyToAsync(mem));
108-
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
145+
// Wait past the 30 sec timeout + 15 sec buffer room
146+
// confirm unhandled exception raised to crush circuit
147+
await Task.Delay(TimeSpan.FromSeconds(45));
148+
Assert.True(timeoutExceptionRaised);
109149

110150
// Act & Assert 4
111151
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
@@ -210,7 +250,7 @@ public TestRemoteJSRuntime(IOptions<CircuitOptions> circuitOptions, IOptions<Hub
210250
{
211251
}
212252

213-
new public ValueTask<TValue> InvokeAsync<TValue>(string identifier, object[] args)
253+
public new ValueTask<TValue> InvokeAsync<TValue>(string identifier, object[] args)
214254
{
215255
Assert.Equal("Blazor._internal.sendJSDataStream", identifier);
216256
return ValueTask.FromResult<TValue>(default);

src/Components/Web.JS/src/Platform/Circuits/CircuitStreamingInterop.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ export function sendJSDataStream(connection: HubConnection, data: ArrayBufferVie
1212
let position = 0;
1313
let chunkId = 0;
1414

15-
// Note: The server-side `StreamBufferCapacity` option (defaults to 10) can be configured to limit how many
16-
// stream items from the client (per stream) will be stored before reading any more stream items (thus applying backpressure).
1715
while (position < data.byteLength) {
1816
const nextChunkSize = Math.min(chunkSize, data.byteLength - position);
1917
const nextChunkData = new Uint8Array(data.buffer, data.byteOffset + position, nextChunkSize);

0 commit comments

Comments
 (0)