Skip to content

Streaming CI Debugging #33917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/Components/Server/src/Circuits/RemoteJSDataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,22 @@ private async Task ThrowOnTimeout()
}
}

internal async Task CompletePipeAndDisposeStream(Exception? ex = null)
private async Task CompletePipeAndDisposeStream(Exception? ex = null)
{
await _pipe.Writer.CompleteAsync(ex);
Dispose(true);
}

/// <summary>
/// For testing purposes only.
///
/// Triggers the timeout on the next check.
/// </summary>
internal void InvalidateLastDataReceivedTimeForTimeout()
{
_lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout);
}

protected override void Dispose(bool disposing)
{
if (disposing)
Expand Down
94 changes: 94 additions & 0 deletions src/Components/Server/test/Circuits/RemoteJSDataStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,100 @@ public async void ReceiveData_ProvidedWithOutOfOrderChunk_SimulatesSignalRDiscon
Assert.Equal("Out of sequence chunk received, expected 5, but received 7.", ex.Message);
}

[Fact]
public async void ReceiveData_NoDataProvidedBeforeTimeout_StreamDisposed()
{
// Arrange
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1);
jsRuntime.UnhandledException += (_, ex) =>
{
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
Assert.IsType<TimeoutException>(ex);
timeoutExceptionRaisedSemaphore.Release();
};

var jsStreamReference = Mock.Of<IJSStreamReference>();
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
jsRuntime,
jsStreamReference,
totalLength: 15,
maximumIncomingBytes: 10_000,
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(10), // Note we're using a 10 second timeout for this test
pauseIncomingBytesThreshold: 50,
resumeIncomingBytesThreshold: 25,
cancellationToken: CancellationToken.None);
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
var chunk = new byte[] { 3, 5, 7 };

// Act & Assert 1
// Trigger timeout and ensure unhandled exception raised to crush circuit
remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
await timeoutExceptionRaisedSemaphore.WaitAsync();

// Act & Assert 2
// Confirm exception also raised on pipe reader
using var mem = new MemoryStream();
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem));
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);

// Act & Assert 3
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
Assert.False(success);
}

[Fact]
public async void ReceiveData_ReceivesDataThenTimesout_StreamDisposed()
{
// Arrange
var jsRuntime = new TestRemoteJSRuntime(Options.Create(new CircuitOptions()), Options.Create(new HubOptions()), Mock.Of<ILogger<RemoteJSRuntime>>());
var timeoutExceptionRaisedSemaphore = new SemaphoreSlim(initialCount: 0, maxCount: 1);
jsRuntime.UnhandledException += (_, ex) =>
{
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);
Assert.IsType<TimeoutException>(ex);
timeoutExceptionRaisedSemaphore.Release();
};

var jsStreamReference = Mock.Of<IJSStreamReference>();
var remoteJSDataStream = await RemoteJSDataStream.CreateRemoteJSDataStreamAsync(
jsRuntime,
jsStreamReference,
totalLength: 15,
maximumIncomingBytes: 10_000,
jsInteropDefaultCallTimeout: TimeSpan.FromSeconds(30), // Note we're using a 30 second timeout for this test
pauseIncomingBytesThreshold: 50,
resumeIncomingBytesThreshold: 25,
cancellationToken: CancellationToken.None);
var streamId = GetStreamId(remoteJSDataStream, jsRuntime);
var chunk = new byte[] { 3, 5, 7 };

// Act & Assert 1
var success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 0, chunk, error: null);
Assert.True(success);

// Act & Assert 2
success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 1, chunk, error: null);
Assert.True(success);

// Act & Assert 3
// Trigger timeout and ensure unhandled exception raised to crush circuit
remoteJSDataStream.InvalidateLastDataReceivedTimeForTimeout();
await timeoutExceptionRaisedSemaphore.WaitAsync();

// Act & Assert 4
// Confirm exception also raised on pipe reader
using var mem = new MemoryStream();
var ex = await Assert.ThrowsAsync<TimeoutException>(async () => await remoteJSDataStream.CopyToAsync(mem));
Assert.Equal("Did not receive any data in the alloted time.", ex.Message);

// Act & Assert 5
// Ensures stream is disposed after the timeout and any additional chunks aren't accepted
success = await RemoteJSDataStream.ReceiveData(jsRuntime, streamId, chunkId: 2, chunk, error: null);
Assert.False(success);
}

private static async Task<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(TestRemoteJSRuntime jsRuntime = null)
{
var jsStreamReference = Mock.Of<IJSStreamReference>();
Expand Down