Skip to content

[SignalR] Pass new App to Transport Pipe to App layer #49650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
#if NET8_0_OR_GREATER
using System.Runtime.Versioning;
#endif
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.Connections.Abstractions;

/// <summary>
/// Provides access to connection reconnect operations.
/// </summary>
/// <remarks>This feature is experimental.</remarks>
#if NET8_0_OR_GREATER
[RequiresPreviewFeatures("IStatefulReconnectFeature is a preview interface")]
#endif
public interface IStatefulReconnectFeature
{
/// <summary>
/// Called when a connection reconnects. The new <see cref="PipeWriter"/> that application code should write to is passed in.
/// </summary>
public void OnReconnected(Func<PipeWriter, Task> notifyOnReconnect);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void OnReconnected(Func<PipeWriter, Task> notifyOnReconnect);
public void OnReconnected(Func<PipeWriter, Task> callback);


/// <summary>
/// Allows disabling the reconnect feature so a reconnecting connection will not be allowed anymore.
/// </summary>
void DisableReconnect();
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#nullable enable
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.get -> System.Action!
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.set -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.DisableReconnect() -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.OnReconnected(System.Func<System.IO.Pipelines.PipeWriter!, System.Threading.Tasks.Task!>! notifyOnReconnect) -> void
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature.Tags.get -> System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<string!, object?>>!
Microsoft.AspNetCore.Connections.Features.IConnectionNamedPipeFeature
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#nullable enable
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.get -> System.Action!
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.set -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.DisableReconnect() -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.OnReconnected(System.Func<System.IO.Pipelines.PipeWriter!, System.Threading.Tasks.Task!>! notifyOnReconnect) -> void
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature.Tags.get -> System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<string!, object?>>!
Microsoft.AspNetCore.Connections.Features.IConnectionNamedPipeFeature
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#nullable enable
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.get -> System.Action!
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.set -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.DisableReconnect() -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.OnReconnected(System.Func<System.IO.Pipelines.PipeWriter!, System.Threading.Tasks.Task!>! notifyOnReconnect) -> void
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature.Tags.get -> System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<string!, object?>>!
Microsoft.AspNetCore.Connections.Features.IConnectionNamedPipeFeature
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#nullable enable
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.get -> System.Action!
Microsoft.AspNetCore.Connections.Abstractions.IReconnectFeature.NotifyOnReconnect.set -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.DisableReconnect() -> void
Microsoft.AspNetCore.Connections.Abstractions.IStatefulReconnectFeature.OnReconnected(System.Func<System.IO.Pipelines.PipeWriter!, System.Threading.Tasks.Task!>! notifyOnReconnect) -> void
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature
Microsoft.AspNetCore.Connections.Features.IConnectionMetricsTagsFeature.Tags.get -> System.Collections.Generic.ICollection<System.Collections.Generic.KeyValuePair<string!, object?>>!
Microsoft.AspNetCore.Connections.Features.IConnectionNamedPipeFeature
Expand Down
21 changes: 19 additions & 2 deletions src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,13 @@ private async Task StopAsyncCore(bool disposing)
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);
}

#pragma warning disable CA2252 // This API requires opting into preview features
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if we do make API changes to preview features in 9.0, it could make things break harder when mixing 8.0 and 9.0 packages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So like the following examples:

  • If you used 8.0 client on the server that targets 9.0
  • If you use Connection.Abstractions 9.0 with 8.0 client

Yeah that does seem bad :/
Generally we say "use the same version packages" since we ship everything at the same time. But it's sort of a cop-out.

if (connectionState.Connection.Features.Get<IStatefulReconnectFeature>() is IStatefulReconnectFeature feature)
{
feature.DisableReconnect();
}
#pragma warning restore CA2252 // This API requires opting into preview features
}
else
{
Expand Down Expand Up @@ -1088,6 +1095,14 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
{
Log.ReceivedCloseWithError(_logger, close.Error);
}

#pragma warning disable CA2252 // This API requires opting into preview features
if (connectionState.Connection.Features.Get<IStatefulReconnectFeature>() is IStatefulReconnectFeature feature)
{
feature.DisableReconnect();
}
#pragma warning restore CA2252 // This API requires opting into preview features

return close;
case PingMessage _:
Log.ReceivedPing(_logger);
Expand Down Expand Up @@ -1900,14 +1915,16 @@ public ConnectionState(ConnectionContext connection, HubConnection hubConnection
_logger = _hubConnection._logger;
_hasInherentKeepAlive = connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive ?? false;

if (Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature)
#pragma warning disable CA2252 // This API requires opting into preview features
if (Connection.Features.Get<IStatefulReconnectFeature>() is IStatefulReconnectFeature feature)
{
_messageBuffer = new MessageBuffer(connection, hubConnection._protocol,
_hubConnection._serviceProvider.GetService<IOptions<HubConnectionOptions>>()?.Value.StatefulReconnectBufferSize
?? DefaultStatefulReconnectBufferSize);

feature.NotifyOnReconnect = _messageBuffer.Resend;
feature.OnReconnected(_messageBuffer.ResendAsync);
}
#pragma warning restore CA2252 // This API requires opting into preview features
}

public string GetNextId() => (++_nextInvocationId).ToString(CultureInfo.InvariantCulture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net.WebSockets;
using System.Threading.Channels;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Abstractions;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Http.Connections.Client;
using Microsoft.AspNetCore.SignalR.Protocol;
Expand Down Expand Up @@ -878,6 +879,70 @@ public async Task HubConnectionIsMockable()
mockConnection.Verify(c => c.StopAsync(It.IsAny<CancellationToken>()), Times.Once);
}

[Fact]
public async Task DisableReconnectCalledWhenCloseMessageReceived()
{
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
var innerConnection = new TestConnection();
var reconnectFeature = new TestReconnectFeature();
#pragma warning disable CA2252 // This API requires opting into preview features
innerConnection.Features.Set<IStatefulReconnectFeature>(reconnectFeature);
#pragma warning restore CA2252 // This API requires opting into preview features

var delegateConnectionFactory = new DelegateConnectionFactory(
endPoint => innerConnection.StartAsync());
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);

var hubConnection = builder.Build();
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e =>
{
closedEventTcs.SetResult(e);
return Task.CompletedTask;
};

await hubConnection.StartAsync().DefaultTimeout();

await innerConnection.ReceiveJsonMessage(new { type = HubProtocolConstants.CloseMessageType });

var exception = await closedEventTcs.Task.DefaultTimeout();
Assert.Null(exception);

await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
}

[Fact]
public async Task DisableReconnectCalledWhenSendingCloseMessage()
{
var builder = new HubConnectionBuilder().WithUrl("http://example.com");
var innerConnection = new TestConnection();
var reconnectFeature = new TestReconnectFeature();
#pragma warning disable CA2252 // This API requires opting into preview features
innerConnection.Features.Set<IStatefulReconnectFeature>(reconnectFeature);
#pragma warning restore CA2252 // This API requires opting into preview features

var delegateConnectionFactory = new DelegateConnectionFactory(
endPoint => innerConnection.StartAsync());
builder.Services.AddSingleton<IConnectionFactory>(delegateConnectionFactory);

var hubConnection = builder.Build();
var closedEventTcs = new TaskCompletionSource<Exception>();
hubConnection.Closed += e =>
{
closedEventTcs.SetResult(e);
return Task.CompletedTask;
};

await hubConnection.StartAsync().DefaultTimeout();

await hubConnection.StopAsync().DefaultTimeout();

var exception = await closedEventTcs.Task.DefaultTimeout();
Assert.Null(exception);

await reconnectFeature.DisableReconnectCalled.DefaultTimeout();
}

private class SampleObject
{
public SampleObject(string foo, int bar)
Expand Down Expand Up @@ -962,4 +1027,24 @@ public ReadOnlyMemory<byte> GetMessageBytes(HubMessage message)
return HubProtocolExtensions.GetMessageBytes(this, message);
}
}

#pragma warning disable CA2252 // This API requires opting into preview features
private sealed class TestReconnectFeature : IStatefulReconnectFeature
#pragma warning restore CA2252 // This API requires opting into preview features
{
private TaskCompletionSource _disableReconnect = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

public Task DisableReconnectCalled => _disableReconnect.Task;

#pragma warning disable CA2252 // This API requires opting into preview features
public void OnReconnected(Func<PipeWriter, Task> notifyOnReconnected) { }
#pragma warning restore CA2252 // This API requires opting into preview features

#pragma warning disable CA2252 // This API requires opting into preview features
public void DisableReconnect()
#pragma warning restore CA2252 // This API requires opting into preview features
{
_disableReconnect.TrySetResult();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -531,9 +531,11 @@ private async Task StartTransport(Uri connectUrl, HttpTransportType transportTyp
// We successfully started, set the transport properties (we don't want to set these until the transport is definitely running).
_transport = transport;

if (useAck && _transport is IReconnectFeature reconnectFeature)
if (useAck && _transport is IStatefulReconnectFeature reconnectFeature)
{
#pragma warning disable CA2252 // This API requires opting into preview features
Features.Set(reconnectFeature);
#pragma warning restore CA2252 // This API requires opting into preview features
}

Log.TransportStarted(_logger, transportType);
Expand Down
Loading