Skip to content

Commit 7bb3130

Browse files
authored
Merge pull request #20384 from dotnet-maestro-bot/merge/release/5.0-preview3-to-master
[automated] Merge branch 'release/5.0-preview3' => 'master'
2 parents a5cf518 + b8743fc commit 7bb3130

File tree

5 files changed

+141
-24
lines changed

5 files changed

+141
-24
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
235235

236236
private async Task StartAsyncInner(CancellationToken cancellationToken = default)
237237
{
238-
await _state.WaitConnectionLockAsync();
238+
await _state.WaitConnectionLockAsync(token: cancellationToken);
239239
try
240240
{
241241
if (!_state.TryChangeState(HubConnectionState.Disconnected, HubConnectionState.Connecting))
@@ -465,7 +465,7 @@ private async Task StopAsyncCore(bool disposing)
465465

466466
// Potentially wait for StartAsync to finish, and block a new StartAsync from
467467
// starting until we've finished stopping.
468-
await _state.WaitConnectionLockAsync();
468+
await _state.WaitConnectionLockAsync(token: default);
469469

470470
// Ensure that ReconnectingState.ReconnectTask is not accessed outside of the lock.
471471
var reconnectTask = _state.ReconnectTask;
@@ -478,7 +478,7 @@ private async Task StopAsyncCore(bool disposing)
478478
// The StopCts should prevent the HubConnection from restarting until it is reset.
479479
_state.ReleaseConnectionLock();
480480
await reconnectTask;
481-
await _state.WaitConnectionLockAsync();
481+
await _state.WaitConnectionLockAsync(token: default);
482482
}
483483

484484
ConnectionState connectionState;
@@ -574,7 +574,7 @@ private async Task<ChannelReader<object>> StreamAsChannelCoreAsyncCore(string me
574574
async Task OnStreamCanceled(InvocationRequest irq)
575575
{
576576
// We need to take the connection lock in order to ensure we a) have a connection and b) are the only one accessing the write end of the pipe.
577-
await _state.WaitConnectionLockAsync();
577+
await _state.WaitConnectionLockAsync(token: default);
578578
try
579579
{
580580
if (_state.CurrentConnectionStateUnsynchronized != null)
@@ -601,7 +601,7 @@ async Task OnStreamCanceled(InvocationRequest irq)
601601
var readers = default(Dictionary<string, object>);
602602

603603
CheckDisposed();
604-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync));
604+
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(StreamAsChannelCoreAsync), token: cancellationToken);
605605

606606
ChannelReader<object> channel;
607607
try
@@ -704,7 +704,7 @@ async Task ReadChannelStream(CancellationTokenSource tokenSource)
704704
{
705705
while (!tokenSource.Token.IsCancellationRequested && reader.TryRead(out var item))
706706
{
707-
await SendWithLock(connectionState, new StreamItemMessage(streamId, item));
707+
await SendWithLock(connectionState, new StreamItemMessage(streamId, item), tokenSource.Token);
708708
Log.SendingStreamItem(_logger, streamId);
709709
}
710710
}
@@ -722,7 +722,7 @@ async Task ReadAsyncEnumerableStream(CancellationTokenSource tokenSource)
722722

723723
await foreach (var streamValue in streamValues)
724724
{
725-
await SendWithLock(connectionState, new StreamItemMessage(streamId, streamValue));
725+
await SendWithLock(connectionState, new StreamItemMessage(streamId, streamValue), tokenSource.Token);
726726
Log.SendingStreamItem(_logger, streamId);
727727
}
728728
}
@@ -750,15 +750,17 @@ private async Task CommonStreaming(ConnectionState connectionState, string strea
750750

751751
Log.CompletingStream(_logger, streamId);
752752

753-
await SendWithLock(connectionState, CompletionMessage.WithError(streamId, responseError), cts.Token);
753+
// Don't use cancellation token here
754+
// this is triggered by a cancellation token to tell the server that the client is done streaming
755+
await SendWithLock(connectionState, CompletionMessage.WithError(streamId, responseError), cancellationToken: default);
754756
}
755757

756758
private async Task<object> InvokeCoreAsyncCore(string methodName, Type returnType, object[] args, CancellationToken cancellationToken)
757759
{
758760
var readers = default(Dictionary<string, object>);
759761

760762
CheckDisposed();
761-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync));
763+
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(InvokeCoreAsync), token: cancellationToken);
762764

763765
Task<object> invocationTask;
764766
try
@@ -853,7 +855,7 @@ private async Task SendCoreAsyncCore(string methodName, object[] args, Cancellat
853855
var readers = default(Dictionary<string, object>);
854856

855857
CheckDisposed();
856-
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync));
858+
var connectionState = await _state.WaitForActiveConnectionAsync(nameof(SendCoreAsync), token: cancellationToken);
857859
try
858860
{
859861
CheckDisposed();
@@ -872,10 +874,10 @@ private async Task SendCoreAsyncCore(string methodName, object[] args, Cancellat
872874
}
873875
}
874876

875-
private async Task SendWithLock(ConnectionState expectedConnectionState, HubMessage message, CancellationToken cancellationToken = default, [CallerMemberName] string callerName = "")
877+
private async Task SendWithLock(ConnectionState expectedConnectionState, HubMessage message, CancellationToken cancellationToken, [CallerMemberName] string callerName = "")
876878
{
877879
CheckDisposed();
878-
var connectionState = await _state.WaitForActiveConnectionAsync(callerName);
880+
var connectionState = await _state.WaitForActiveConnectionAsync(callerName, token: cancellationToken);
879881
try
880882
{
881883
CheckDisposed();
@@ -1245,7 +1247,7 @@ internal void OnServerTimeout()
12451247
private async Task HandleConnectionClose(ConnectionState connectionState)
12461248
{
12471249
// Clear the connectionState field
1248-
await _state.WaitConnectionLockAsync();
1250+
await _state.WaitConnectionLockAsync(token: default);
12491251
try
12501252
{
12511253
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, connectionState),
@@ -1363,7 +1365,7 @@ private async Task ReconnectAsync(Exception closeException)
13631365
{
13641366
Log.ReconnectingStoppedDuringRetryDelay(_logger);
13651367

1366-
await _state.WaitConnectionLockAsync();
1368+
await _state.WaitConnectionLockAsync(token: default);
13671369
try
13681370
{
13691371
_state.ChangeState(HubConnectionState.Reconnecting, HubConnectionState.Disconnected);
@@ -1378,7 +1380,7 @@ private async Task ReconnectAsync(Exception closeException)
13781380
return;
13791381
}
13801382

1381-
await _state.WaitConnectionLockAsync();
1383+
await _state.WaitConnectionLockAsync(token: default);
13821384
try
13831385
{
13841386
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
@@ -1417,7 +1419,7 @@ private async Task ReconnectAsync(Exception closeException)
14171419
nextRetryDelay = GetNextRetryDelay(previousReconnectAttempts++, DateTime.UtcNow - reconnectStartTime, retryReason);
14181420
}
14191421

1420-
await _state.WaitConnectionLockAsync();
1422+
await _state.WaitConnectionLockAsync(token: default);
14211423
try
14221424
{
14231425
SafeAssert(ReferenceEquals(_state.CurrentConnectionStateUnsynchronized, null),
@@ -1956,10 +1958,10 @@ public void AssertConnectionValid([CallerMemberName] string memberName = null, [
19561958
SafeAssert(CurrentConnectionStateUnsynchronized != null, "We don't have a connection!", memberName, fileName, lineNumber);
19571959
}
19581960

1959-
public Task WaitConnectionLockAsync([CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
1961+
public Task WaitConnectionLockAsync(CancellationToken token, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
19601962
{
19611963
Log.WaitingOnConnectionLock(_logger, memberName, filePath, lineNumber);
1962-
return _connectionLock.WaitAsync();
1964+
return _connectionLock.WaitAsync(token);
19631965
}
19641966

19651967
public bool TryAcquireConnectionLock()
@@ -1968,9 +1970,9 @@ public bool TryAcquireConnectionLock()
19681970
}
19691971

19701972
// Don't call this method in a try/finally that releases the lock since we're also potentially releasing the connection lock here.
1971-
public async Task<ConnectionState> WaitForActiveConnectionAsync(string methodName, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
1973+
public async Task<ConnectionState> WaitForActiveConnectionAsync(string methodName, CancellationToken token, [CallerMemberName] string memberName = null, [CallerFilePath] string filePath = null, [CallerLineNumber] int lineNumber = 0)
19721974
{
1973-
await WaitConnectionLockAsync(methodName);
1975+
await WaitConnectionLockAsync(token, methodName);
19741976

19751977
if (CurrentConnectionStateUnsynchronized == null || CurrentConnectionStateUnsynchronized.Stopping)
19761978
{

src/SignalR/clients/csharp/Client/test/UnitTests/HttpConnectionTests.ConnectionLifecycle.cs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ await WithConnectionAsync(
443443
})),
444444
async (connection) =>
445445
{
446-
// We aggregate failures that happen when we start the transport. The operation cancelled exception will
446+
// We aggregate failures that happen when we start the transport. The operation canceled exception will
447447
// be an inner exception.
448448
var ex = await Assert.ThrowsAsync<AggregateException>(async () => await connection.StartAsync(cts.Token)).OrTimeout();
449449
Assert.Equal(3, ex.InnerExceptions.Count);
@@ -454,6 +454,29 @@ await WithConnectionAsync(
454454
}
455455
}
456456

457+
[Fact]
458+
public async Task CanceledCancellationTokenPassedToStartThrows()
459+
{
460+
using (StartVerifiableLog())
461+
{
462+
bool transportStartCalled = false;
463+
var httpHandler = new TestHttpMessageHandler();
464+
465+
await WithConnectionAsync(
466+
CreateConnection(httpHandler,
467+
transport: new TestTransport(onTransportStart: () => {
468+
transportStartCalled = true;
469+
return Task.CompletedTask;
470+
})),
471+
async (connection) =>
472+
{
473+
await Assert.ThrowsAsync<TaskCanceledException>(async () => await connection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
474+
});
475+
476+
Assert.False(transportStartCalled);
477+
}
478+
}
479+
457480
[Fact]
458481
public async Task SSECanBeCanceled()
459482
{

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.ConnectionLifecycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ public async Task StartAsyncWithTriggeredCancellationTokenIsCanceled()
541541
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
542542
try
543543
{
544-
await Assert.ThrowsAsync<OperationCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
544+
await Assert.ThrowsAsync<TaskCanceledException>(() => hubConnection.StartAsync(new CancellationToken(canceled: true))).OrTimeout();
545545
Assert.False(onStartCalled);
546546
}
547547
finally

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.cs

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,98 @@ bool ExpectedErrors(WriteContext writeContext)
162162
}
163163
}
164164

165+
[Fact]
166+
public async Task PendingInvocationsAreCanceledWhenTokenTriggered()
167+
{
168+
using (StartVerifiableLog())
169+
{
170+
var hubConnection = CreateHubConnection(new TestConnection(), loggerFactory: LoggerFactory);
171+
172+
await hubConnection.StartAsync().OrTimeout();
173+
var cts = new CancellationTokenSource();
174+
var invokeTask = hubConnection.InvokeAsync<int>("testMethod", cancellationToken: cts.Token).OrTimeout();
175+
cts.Cancel();
176+
177+
await Assert.ThrowsAsync<TaskCanceledException>(async () => await invokeTask);
178+
}
179+
}
180+
181+
[Fact]
182+
public async Task InvokeAsyncCanceledWhenPassedCanceledToken()
183+
{
184+
using (StartVerifiableLog())
185+
{
186+
var connection = new TestConnection();
187+
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
188+
189+
await hubConnection.StartAsync().OrTimeout();
190+
await Assert.ThrowsAsync<TaskCanceledException>(() =>
191+
hubConnection.InvokeAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
192+
193+
await hubConnection.StopAsync().OrTimeout();
194+
195+
// Assert that InvokeAsync didn't send a message
196+
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
197+
}
198+
}
199+
200+
[Fact]
201+
public async Task SendAsyncCanceledWhenPassedCanceledToken()
202+
{
203+
using (StartVerifiableLog())
204+
{
205+
var connection = new TestConnection();
206+
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
207+
208+
await hubConnection.StartAsync().OrTimeout();
209+
await Assert.ThrowsAsync<TaskCanceledException>(() =>
210+
hubConnection.SendAsync("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
211+
212+
await hubConnection.StopAsync().OrTimeout();
213+
214+
// Assert that SendAsync didn't send a message
215+
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
216+
}
217+
}
218+
219+
[Fact]
220+
public async Task StreamAsChannelAsyncCanceledWhenPassedCanceledToken()
221+
{
222+
using (StartVerifiableLog())
223+
{
224+
var connection = new TestConnection();
225+
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
226+
227+
await hubConnection.StartAsync().OrTimeout();
228+
await Assert.ThrowsAsync<TaskCanceledException>(() =>
229+
hubConnection.StreamAsChannelAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true)).OrTimeout());
230+
231+
await hubConnection.StopAsync().OrTimeout();
232+
233+
// Assert that StreamAsChannelAsync didn't send a message
234+
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
235+
}
236+
}
237+
238+
[Fact]
239+
public async Task StreamAsyncCanceledWhenPassedCanceledToken()
240+
{
241+
using (StartVerifiableLog())
242+
{
243+
var connection = new TestConnection();
244+
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
245+
246+
await hubConnection.StartAsync().OrTimeout();
247+
var result = hubConnection.StreamAsync<int>("testMethod", cancellationToken: new CancellationToken(canceled: true));
248+
await Assert.ThrowsAsync<TaskCanceledException>(() => result.GetAsyncEnumerator().MoveNextAsync().OrTimeout());
249+
250+
await hubConnection.StopAsync().OrTimeout();
251+
252+
// Assert that StreamAsync didn't send a message
253+
Assert.Null(await connection.ReadSentTextMessageAsync().OrTimeout());
254+
}
255+
}
256+
165257
[Fact]
166258
public async Task ConnectionTerminatedIfServerTimeoutIntervalElapsesWithNoMessages()
167259
{
@@ -362,7 +454,7 @@ await connection.ReceiveJsonMessage(
362454

363455
[Fact]
364456
[LogLevel(LogLevel.Trace)]
365-
public async Task UploadStreamCancelationSendsStreamComplete()
457+
public async Task UploadStreamCancellationSendsStreamComplete()
366458
{
367459
using (StartVerifiableLog())
368460
{

src/SignalR/clients/csharp/Http.Connections.Client/src/HttpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private async Task StartAsyncCore(TransferFormat transferFormat, CancellationTok
205205
return;
206206
}
207207

208-
await _connectionLock.WaitAsync();
208+
await _connectionLock.WaitAsync(cancellationToken);
209209
try
210210
{
211211
CheckDisposed();

0 commit comments

Comments
 (0)