Skip to content

Handle cancellation token during the consumer close #339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

namespace RabbitMQ.Stream.Client
{

public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
Expand All @@ -19,7 +18,8 @@ internal enum EntityStatus
{
Open,
Closed,
Disposed
Disposed,
Initializing
}

public interface IClosable
Expand Down Expand Up @@ -65,6 +65,8 @@ private void MaybeCancelToken()
/// <returns></returns>
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);

// private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);

/// <summary>
/// Internal close method. It is called by the public Close method.
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
Expand All @@ -75,13 +77,13 @@ private void MaybeCancelToken()
/// <returns></returns>
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
{
MaybeCancelToken();

if (!IsOpen()) // the client is already closed
{
return ResponseCode.Ok;
}

MaybeCancelToken();

_status = EntityStatus.Closed;
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);

Expand Down Expand Up @@ -120,7 +122,8 @@ protected void Dispose(bool disposing)
}
catch (Exception e)
{
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(),
e.Message);
}
finally
{
Expand Down
31 changes: 24 additions & 7 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
client.connection = await Connection
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
.ConfigureAwait(false);

client.connection.ClientId = client.ClientId;
// exchange properties
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
Expand Down Expand Up @@ -307,7 +307,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList(), IncrementEntityId());
DeclarePublisherResponse response;

try
Expand All @@ -321,7 +321,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
_poolSemaphore.Release();
}

if (response.ResponseCode == ResponseCode.Ok || pool == null)
if (response.ResponseCode == ResponseCode.Ok)
return (publisherId, response);

// if the response code is not ok we need to remove the subscription
Expand Down Expand Up @@ -358,22 +358,40 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
ushort initialCredit,
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null, ConnectionsPool pool = null)
{
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType, Pool = pool },
initialCredit,
properties,
deliverHandler,
consumerUpdateHandler).ConfigureAwait(false);
}

private byte _nextEntityId = 0;

// the entity id is a byte so we need to increment it and reset it when it reaches the max value
// to avoid to use always the same ids when producers and consumers are created
// so even there is a connection with one producer or consumer we need to increment the id
private byte IncrementEntityId()
{
lock (Obj)
{
var current = _nextEntityId;
_nextEntityId++;
if (_nextEntityId != byte.MaxValue)
return current;
_nextEntityId = 0;
return _nextEntityId;
}
}

public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
ushort initialCredit,
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList(), IncrementEntityId());
SubscribeResponse response;
try
{
Expand Down Expand Up @@ -788,7 +806,6 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
pool.Remove(ClientId);
await Close(reason).ConfigureAwait(false);
}
Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Connection : IDisposable
private CancellationToken Token => _cancelTokenSource.Token;

internal int NumFrames => numFrames;

internal string ClientId { get; set; }
public bool IsClosed => isClosed;

private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
Expand Down Expand Up @@ -191,6 +191,8 @@ private async Task ProcessIncomingFrames()
finally
{
isClosed = true;
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
ClientId, Token.IsCancellationRequested);
// Mark the PipeReader as complete
await reader.CompleteAsync(caught).ConfigureAwait(false);
var t = closedCallback?.Invoke("TCP Connection Closed")!;
Expand Down
27 changes: 16 additions & 11 deletions RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,35 @@ public class ConnectionsPool
{
private static readonly object s_lock = new();

internal static byte FindNextValidId(List<byte> ids)
internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
{
lock (s_lock)
{
if (ids.Count == 0)
{
return 0;
}

// // we start with the recycle when we reach the max value
// // in this way we can avoid to recycle the same ids in a short time
ids.Sort();
if (ids[^1] != byte.MaxValue)
return (byte)(ids[^1] + 1);
var l = ids.Where(b => b >= nextId).ToList();
l.Sort();
if (l.Count == 0)
{
// not necessary to start from 0 because the ids are recycled
// nextid is passed as parameter to avoid to start from 0
// see client:IncrementEntityId/0
return nextId;
}

if (l[^1] != byte.MaxValue)
return (byte)(l[^1] + 1);

for (var i = 0; i < ids.Count - 1; i++)
{
if (ids[i + 1] - ids[i] > 1)
if (l[i + 1] - l[i] > 1)
{
return (byte)(ids[i] + 1);
return (byte)(l[i] + 1);
}
}

return (byte)(ids[^1] + 1);
return (byte)(l[^1] + 1);
}
}

Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ RabbitMQ.Stream.Client.Client.QueryPublisherSequence(string publisherRef, string
RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.ValueTask<bool>
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlr
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
Expand Down
Loading