Skip to content

Commit 1ea5be4

Browse files
authored
Streaming CI Debugging (#33917)
* CI Debugging * CiData message * CiData message * Update RemoteJSDataStream.cs * Remove Task.Delay * Update RemoteJSDataStream.cs * Update RemoteJSDataStream.cs
1 parent 3836301 commit 1ea5be4

File tree

2 files changed

+105
-1
lines changed

2 files changed

+105
-1
lines changed

src/Components/Server/src/Circuits/RemoteJSDataStream.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,22 @@ private async Task ThrowOnTimeout()
207207
}
208208
}
209209

210-
internal async Task CompletePipeAndDisposeStream(Exception? ex = null)
210+
private async Task CompletePipeAndDisposeStream(Exception? ex = null)
211211
{
212212
await _pipe.Writer.CompleteAsync(ex);
213213
Dispose(true);
214214
}
215215

216+
/// <summary>
217+
/// For testing purposes only.
218+
///
219+
/// Triggers the timeout on the next check.
220+
/// </summary>
221+
internal void InvalidateLastDataReceivedTimeForTimeout()
222+
{
223+
_lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout);
224+
}
225+
216226
protected override void Dispose(bool disposing)
217227
{
218228
if (disposing)

src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,100 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon
155155
Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
156156
}
157157

158+
[Fact]
159+
public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed()
160+
{
161+
// Arrange
162+
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
163+
var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1);
164+
jsRuntime.UnhandledException += (_, ex) =>
165+
{
166+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
167+
Assert.IsType<TimeoutException>(ex);
168+
timeoutExceptionRaisedSemaphore.Release();
169+
};
170+
171+
var jsStreamReference = Mock.Of<IJSStreamReference>();
172+
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
173+
jsRuntime,
174+
jsStreamReference,
175+
totalLength: 15,
176+
maximumIncomingBytes: 10_000,
177+
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(10), // Note we're using a 10 second timeout for this test
178+
pauseIncomingBytesThreshold: 50,
179+
resumeIncomingBytesThreshold: 25,
180+
cancellationToken: CancellationToken.None);
181+
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
182+
var chunk = new byte[] { 3, 5, 7 };
183+
184+
// Act & Assert 1
185+
// Trigger timeout and ensure unhandled exception raised to crush circuit
186+
remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
187+
await timeoutExceptionRaisedSemaphore.WaitAsync();
188+
189+
// Act & Assert 2
190+
// Confirm exception also raised on pipe reader
191+
using var mem = new MemoryStream();
192+
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem));
193+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
194+
195+
// Act & Assert 3
196+
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
197+
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
198+
Assert.False(success);
199+
}
200+
201+
[Fact]
202+
public async void ReceiveData_ReceivesDataThenTimesout_StreamDisposed()
203+
{
204+
// Arrange
205+
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
206+
var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1);
207+
jsRuntime.UnhandledException += (_, ex) =>
208+
{
209+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
210+
Assert.IsType<TimeoutException>(ex);
211+
timeoutExceptionRaisedSemaphore.Release();
212+
};
213+
214+
var jsStreamReference = Mock.Of<IJSStreamReference>();
215+
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
216+
jsRuntime,
217+
jsStreamReference,
218+
totalLength: 15,
219+
maximumIncomingBytes: 10_000,
220+
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(30), // Note we're using a 30 second timeout for this test
221+
pauseIncomingBytesThreshold: 50,
222+
resumeIncomingBytesThreshold: 25,
223+
cancellationToken: CancellationToken.None);
224+
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
225+
var chunk = new byte[] { 3, 5, 7 };
226+
227+
// Act & Assert 1
228+
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
229+
Assert.True(success);
230+
231+
// Act & Assert 2
232+
success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null);
233+
Assert.True(success);
234+
235+
// Act & Assert 3
236+
// Trigger timeout and ensure unhandled exception raised to crush circuit
237+
remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
238+
await timeoutExceptionRaisedSemaphore.WaitAsync();
239+
240+
// Act & Assert 4
241+
// Confirm exception also raised on pipe reader
242+
using var mem = new MemoryStream();
243+
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem));
244+
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
245+
246+
// Act & Assert 5
247+
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
248+
success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null);
249+
Assert.False(success);
250+
}
251+
158252
private static async Task<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null)
159253
{
160254
var jsStreamReference = Mock.Of<IJSStreamReference>();

0 commit comments

Comments
 (0)