Skip to content

Multiple Producers and Consumers per connection #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
2d77218
wip
Gsantomaggio Nov 16, 2023
1dd916b
wip
Gsantomaggio Nov 16, 2023
7f438b7
Implement connection pooling to handle
Gsantomaggio Nov 20, 2023
72d22b2
Implement connection pooling to handle
Gsantomaggio Nov 20, 2023
a6079a1
close the connection in the right way
Gsantomaggio Nov 20, 2023
101d374
Pass the pool instead of use singleton
Gsantomaggio Nov 20, 2023
74f66db
Pass the pool to the super stream
Gsantomaggio Nov 21, 2023
6ac4433
Add log during the tests
Gsantomaggio Nov 21, 2023
6228c54
Implement list for the producers
Gsantomaggio Nov 21, 2023
adf9300
change list to a dictionary to the
Gsantomaggio Nov 22, 2023
95b4b68
Add tests
Gsantomaggio Nov 22, 2023
fcd5a54
formatting
Gsantomaggio Nov 22, 2023
09bdf3f
formatting
Gsantomaggio Nov 22, 2023
71ac9ca
Add more tests
Gsantomaggio Nov 23, 2023
e5f5ef4
Add more tests
Gsantomaggio Nov 23, 2023
900646d
Map connections with streams
Gsantomaggio Nov 24, 2023
c33706b
formatting
Gsantomaggio Nov 24, 2023
3d46579
Add the semaphore to sync the
Gsantomaggio Nov 27, 2023
8eceb91
Update docker image to rabbitmq:3.13-rc
Gsantomaggio Nov 27, 2023
f7a0573
Update docker image to rabbitmq:3.13-rc for windows
Gsantomaggio Nov 27, 2023
fff144d
Add tests to validate the internal pool lists
Gsantomaggio Nov 28, 2023
5c7c00d
Update RabbitMQ version for windows
Gsantomaggio Nov 28, 2023
7fbf1d7
Validate the pool is not null
Gsantomaggio Nov 29, 2023
0833ec2
Validate the pool is not null
Gsantomaggio Nov 29, 2023
15fe9dc
Fix race condition during rawconsumer creation
Gsantomaggio Dec 1, 2023
a85a59d
formatting
Gsantomaggio Dec 1, 2023
1b05c0b
Fix the pool consistency when there are errors
Gsantomaggio Dec 4, 2023
d8d206b
formatting
Gsantomaggio Dec 4, 2023
c7c66a5
remove the max connections from
Gsantomaggio Dec 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "26.1.2",
"rabbitmq": "3.13.0-beta.6"
"rabbitmq": "3.13.0-rc.2"
}
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.13.0-beta.6-management
image: rabbitmq:3.13-rc-management
env:
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4
ports:
Expand Down
51 changes: 50 additions & 1 deletion RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,74 @@
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace RabbitMQ.Stream.Client
{

internal enum EntityStatus
{
Open,
Closed,
Disposed
}
public abstract class AbstractEntity
{
private readonly CancellationTokenSource _cancelTokenSource = new();
protected CancellationToken Token => _cancelTokenSource.Token;

internal EntityStatus _status = EntityStatus.Closed;
// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
protected void MaybeCancelToken()
{

if (!_cancelTokenSource.IsCancellationRequested)
_cancelTokenSource.Cancel();
}

protected Client _client;
public abstract Task<ResponseCode> Close();

protected void Dispose(bool disposing, string entityInfo, ILogger logger)
{
if (!disposing)
{
return;
}

if (_status == EntityStatus.Disposed)
{
return;
}

try
{
var closeTask = Close();
if (!closeTask.Wait(Consts.MidWait))
{
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
}
}
catch (Exception e)
{
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
}
finally
{
_status = EntityStatus.Disposed;
}
}

public bool IsOpen()
{
return _status == EntityStatus.Open;
}

internal Client _client;

}
}
155 changes: 114 additions & 41 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,16 @@ public class Client : IClient

private uint correlationId = 0; // allow for some pre-amble

private byte nextPublisherId = 0;

private Connection connection;

private readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
publishers =
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();

private readonly ConcurrentDictionary<uint, IValueTaskSource> requests = new();

private readonly TaskCompletionSource<TuneResponse> tuneReceived =
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);

private byte nextSubscriptionId;
internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
publishers =
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();

internal readonly IDictionary<byte, ConsumerEvents> consumers =
new ConcurrentDictionary<byte, ConsumerEvents>();
Expand Down Expand Up @@ -157,17 +153,12 @@ private Client(ClientParameters parameters, ILogger logger = null)
(int)parameters.Heartbeat.TotalSeconds);
IsClosed = false;
_logger = logger ?? NullLogger.Instance;
}

private byte GetNextSubscriptionId()
{
byte result;
lock (Obj)
ClientId = Guid.NewGuid().ToString();
AppDomain.CurrentDomain.UnhandledException += (sender, args) =>
{
result = nextSubscriptionId++;
}

return result;
_logger.LogError(args.ExceptionObject as Exception, "Unhandled exception");
Parameters.UnhandledExceptionHandler(args.ExceptionObject as Exception);
};
}

public bool IsClosed
Expand Down Expand Up @@ -304,16 +295,36 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
public async Task<(byte, DeclarePublisherResponse)> DeclarePublisher(string publisherRef,
string stream,
Action<ReadOnlyMemory<ulong>> confirmCallback,
Action<(ulong, ResponseCode)[]> errorCallback)
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
{
var publisherId = nextPublisherId++;
publishers.Add(publisherId, (confirmCallback, errorCallback));
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
DeclarePublisherResponse response;

try
{
publishers.Add(publisherId, (confirmCallback, errorCallback));
response = await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false);
}
finally
{
_poolSemaphore.Release();
}

if (response.ResponseCode == ResponseCode.Ok || pool == null)
return (publisherId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
publishers.Remove(publisherId);
await MaybeClose("Create Publisher Exception", stream, pool).ConfigureAwait(false);
return (publisherId, response);
}

public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var result =
Expand All @@ -325,6 +336,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
finally
{
publishers.Remove(publisherId);
_poolSemaphore.Release();
}
}

Expand All @@ -345,21 +357,38 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
{
var subscriptionId = GetNextSubscriptionId();

consumers.Add(subscriptionId,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler));
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
SubscribeResponse response;
try
{
consumers.Add(subscriptionId,
new ConsumerEvents(
deliverHandler,
consumerUpdateHandler));

return (subscriptionId,
await Request<SubscribeRequest, SubscribeResponse>(corr =>
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
properties)).ConfigureAwait(false));
properties)).ConfigureAwait(false);
}
finally
{
_poolSemaphore.Release();
}

if (response.ResponseCode == ResponseCode.Ok)
return (subscriptionId, response);

// if the response code is not ok we need to remove the subscription
// and close the connection if necessary.
consumers.Remove(subscriptionId);
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
return (subscriptionId, response);
}

public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
// here we reduce a bit the timeout to avoid waiting too much
Expand All @@ -377,6 +406,7 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
consumers.Remove(subscriptionId);
_poolSemaphore.Release();
}
}

Expand Down Expand Up @@ -640,6 +670,14 @@ private void InternalClose()
IsClosed = true;
}

private bool HasEntities()
{
lock (Obj)
{
return publishers.Count > 0 || consumers.Count > 0;
}
}

private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
{
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
Expand All @@ -652,6 +690,7 @@ public async Task<CloseResponse> Close(string reason)
return new CloseResponse(0, ResponseCode.Ok);
}

InternalClose();
try
{
var result =
Expand All @@ -671,28 +710,62 @@ public async Task<CloseResponse> Close(string reason)
}
finally
{
// even if the close fails we need to close the connection
InternalClose();
connection.Dispose();
}

return new CloseResponse(0, ResponseCode.Ok);
}

// _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
// the MaybeClose can be called in different threads so we need to protect the pool
// the pool itself is thread safe but we need to protect the flow to be sure that the
// connection is released only once
private readonly SemaphoreSlim _poolSemaphore = new(1, 1);

// Safe close
// the client can be closed only if the publishers are == 0
// not a public method used internally by producers and consumers
internal async Task<CloseResponse> MaybeClose(string reason)
// the client can be closed only if HasEntities is false
// if the client has entities (publishers or consumers) it will be released from the pool
// Release will decrement the active ids for the connection
// if the active ids are 0 the connection will be closed

internal async Task<CloseResponse> MaybeClose(string reason, string stream, ConnectionsPool pool)
{
if (publishers.Count == 0 && consumers.Count == 0)
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
await Close(reason).ConfigureAwait(false);
}
if (!HasEntities())
{
if (!string.IsNullOrEmpty(ClientId))
{
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
pool.Remove(ClientId);
await Close(reason).ConfigureAwait(false);
}
}
else
{
// we remove an id reference from the client
// in case there are still active ids from the client and the stream
if (!string.IsNullOrEmpty(ClientId))
{
pool.Release(ClientId, stream);
}
}

var result = new CloseResponse(0, ResponseCode.Ok);
return result;
var result = new CloseResponse(0, ResponseCode.Ok);
return result;
}
finally
{
_poolSemaphore.Release();
}
}

public string ClientId { get; init; }

public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
{
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,12 @@ public CrcException(string s)
{
}
}

public class TooManyConnectionsException : Exception
{
public TooManyConnectionsException(string s)
: base(s)
{
}
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ private async Task ProcessIncomingFrames()
var buffer = result.Buffer;
if (buffer.Length == 0)
{
Debug.WriteLine("TCP Connection Closed");
Debug.WriteLine("TCP Connection Closed!");
// We're not going to receive any more bytes from the connection.
break;
}

// Let's try to read some frames!

while (TryReadFrame(ref buffer, out var frame))
while (TryReadFrame(ref buffer, out var frame) && !isClosed)
{
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.

Expand Down
Loading