Skip to content

Commit 4cefb84

Browse files
authored
Multiple Producers and Consumers per connection (#328)
* Implement multiple producers and consumers per connection. Introduce a new set of classes to handle the connection pool. The pool is a dictionary with client_id and the connectioninfo. * The client_id is a field to identify the connection uniquely. * the pool increases the reference count for each request, and for each Close(), it decreases the reference count. * client_poolSemaphore is needed to coordinate the Producer and Consumer creation and destruction since the pool is thread-safe. * To configure the pool, there is ConnectionPoolConfig, where the user can define Producers and/or Consumers per connection. * The client still contains consumers' and publishers' lists to avoid changing the code too much. These two lists are the source of the truth. * This PR intercepts the entities' Creation and Destruction to make the change as safe as possible --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent b237f59 commit 4cefb84

25 files changed

+1618
-198
lines changed

.ci/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "26.1.2",
3-
"rabbitmq": "3.13.0-beta.6"
3+
"rabbitmq": "3.13.0-rc.2"
44
}

.github/workflows/build-test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ jobs:
4343
runs-on: ubuntu-latest
4444
services:
4545
rabbitmq:
46-
image: rabbitmq:3.13.0-beta.6-management
46+
image: rabbitmq:3.13-rc-management
4747
env:
4848
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4
4949
ports:

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,74 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2007-2023 VMware, Inc.
44

5+
using System;
56
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Logging;
69

710
namespace RabbitMQ.Stream.Client
811
{
12+
13+
internal enum EntityStatus
14+
{
15+
Open,
16+
Closed,
17+
Disposed
18+
}
919
public abstract class AbstractEntity
1020
{
1121
private readonly CancellationTokenSource _cancelTokenSource = new();
1222
protected CancellationToken Token => _cancelTokenSource.Token;
1323

24+
internal EntityStatus _status = EntityStatus.Closed;
1425
// here the _cancelTokenSource is disposed and the token is cancelled
1526
// in producer is used to cancel the send task
1627
// in consumer is used to cancel the receive task
1728
protected void MaybeCancelToken()
1829
{
30+
1931
if (!_cancelTokenSource.IsCancellationRequested)
2032
_cancelTokenSource.Cancel();
2133
}
2234

23-
protected Client _client;
35+
public abstract Task<ResponseCode> Close();
36+
37+
protected void Dispose(bool disposing, string entityInfo, ILogger logger)
38+
{
39+
if (!disposing)
40+
{
41+
return;
42+
}
43+
44+
if (_status == EntityStatus.Disposed)
45+
{
46+
return;
47+
}
48+
49+
try
50+
{
51+
var closeTask = Close();
52+
if (!closeTask.Wait(Consts.MidWait))
53+
{
54+
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
55+
}
56+
}
57+
catch (Exception e)
58+
{
59+
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
60+
}
61+
finally
62+
{
63+
_status = EntityStatus.Disposed;
64+
}
65+
}
66+
67+
public bool IsOpen()
68+
{
69+
return _status == EntityStatus.Open;
70+
}
71+
72+
internal Client _client;
2473

2574
}
2675
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 114 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,16 @@ public class Client : IClient
107107

108108
private uint correlationId = 0; // allow for some pre-amble
109109

110-
private byte nextPublisherId = 0;
111-
112110
private Connection connection;
113111

114-
private readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
115-
publishers =
116-
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
117-
118112
private readonly ConcurrentDictionary<uint, IValueTaskSource> requests = new();
119113

120114
private readonly TaskCompletionSource<TuneResponse> tuneReceived =
121115
new TaskCompletionSource<TuneResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
122116

123-
private byte nextSubscriptionId;
117+
internal readonly IDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>
118+
publishers =
119+
new ConcurrentDictionary<byte, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>)>();
124120

125121
internal readonly IDictionary<byte, ConsumerEvents> consumers =
126122
new ConcurrentDictionary<byte, ConsumerEvents>();
@@ -157,17 +153,12 @@ private Client(ClientParameters parameters, ILogger logger = null)
157153
(int)parameters.Heartbeat.TotalSeconds);
158154
IsClosed = false;
159155
_logger = logger ?? NullLogger.Instance;
160-
}
161-
162-
private byte GetNextSubscriptionId()
163-
{
164-
byte result;
165-
lock (Obj)
156+
ClientId = Guid.NewGuid().ToString();
157+
AppDomain.CurrentDomain.UnhandledException += (sender, args) =>
166158
{
167-
result = nextSubscriptionId++;
168-
}
169-
170-
return result;
159+
_logger.LogError(args.ExceptionObject as Exception, "Unhandled exception");
160+
Parameters.UnhandledExceptionHandler(args.ExceptionObject as Exception);
161+
};
171162
}
172163

173164
public bool IsClosed
@@ -304,16 +295,36 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
304295
public async Task<(byte, DeclarePublisherResponse)> DeclarePublisher(string publisherRef,
305296
string stream,
306297
Action<ReadOnlyMemory<ulong>> confirmCallback,
307-
Action<(ulong, ResponseCode)[]> errorCallback)
298+
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
308299
{
309-
var publisherId = nextPublisherId++;
310-
publishers.Add(publisherId, (confirmCallback, errorCallback));
311-
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
312-
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
300+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
301+
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
302+
DeclarePublisherResponse response;
303+
304+
try
305+
{
306+
publishers.Add(publisherId, (confirmCallback, errorCallback));
307+
response = await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
308+
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false);
309+
}
310+
finally
311+
{
312+
_poolSemaphore.Release();
313+
}
314+
315+
if (response.ResponseCode == ResponseCode.Ok || pool == null)
316+
return (publisherId, response);
317+
318+
// if the response code is not ok we need to remove the subscription
319+
// and close the connection if necessary.
320+
publishers.Remove(publisherId);
321+
await MaybeClose("Create Publisher Exception", stream, pool).ConfigureAwait(false);
322+
return (publisherId, response);
313323
}
314324

315325
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
316326
{
327+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
317328
try
318329
{
319330
var result =
@@ -325,6 +336,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
325336
finally
326337
{
327338
publishers.Remove(publisherId);
339+
_poolSemaphore.Release();
328340
}
329341
}
330342

@@ -345,21 +357,38 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
345357
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
346358
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
347359
{
348-
var subscriptionId = GetNextSubscriptionId();
349-
350-
consumers.Add(subscriptionId,
351-
new ConsumerEvents(
352-
deliverHandler,
353-
consumerUpdateHandler));
360+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
361+
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
362+
SubscribeResponse response;
363+
try
364+
{
365+
consumers.Add(subscriptionId,
366+
new ConsumerEvents(
367+
deliverHandler,
368+
consumerUpdateHandler));
354369

355-
return (subscriptionId,
356-
await Request<SubscribeRequest, SubscribeResponse>(corr =>
370+
response = await Request<SubscribeRequest, SubscribeResponse>(corr =>
357371
new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit,
358-
properties)).ConfigureAwait(false));
372+
properties)).ConfigureAwait(false);
373+
}
374+
finally
375+
{
376+
_poolSemaphore.Release();
377+
}
378+
379+
if (response.ResponseCode == ResponseCode.Ok)
380+
return (subscriptionId, response);
381+
382+
// if the response code is not ok we need to remove the subscription
383+
// and close the connection if necessary.
384+
consumers.Remove(subscriptionId);
385+
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
386+
return (subscriptionId, response);
359387
}
360388

361389
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
362390
{
391+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
363392
try
364393
{
365394
// here we reduce a bit the timeout to avoid waiting too much
@@ -377,6 +406,7 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
377406
_logger.LogDebug("Unsubscribe: {SubscriptionId}", subscriptionId);
378407
// remove consumer after RPC returns, this should avoid uncorrelated data being sent
379408
consumers.Remove(subscriptionId);
409+
_poolSemaphore.Release();
380410
}
381411
}
382412

@@ -640,6 +670,14 @@ private void InternalClose()
640670
IsClosed = true;
641671
}
642672

673+
private bool HasEntities()
674+
{
675+
lock (Obj)
676+
{
677+
return publishers.Count > 0 || consumers.Count > 0;
678+
}
679+
}
680+
643681
private async ValueTask<bool> ConsumerUpdateResponse(uint rCorrelationId, IOffsetType offsetSpecification)
644682
{
645683
return await Publish(new ConsumerUpdateRequest(rCorrelationId, offsetSpecification)).ConfigureAwait(false);
@@ -652,6 +690,7 @@ public async Task<CloseResponse> Close(string reason)
652690
return new CloseResponse(0, ResponseCode.Ok);
653691
}
654692

693+
InternalClose();
655694
try
656695
{
657696
var result =
@@ -671,28 +710,62 @@ public async Task<CloseResponse> Close(string reason)
671710
}
672711
finally
673712
{
674-
// even if the close fails we need to close the connection
675-
InternalClose();
676713
connection.Dispose();
677714
}
678715

679716
return new CloseResponse(0, ResponseCode.Ok);
680717
}
681718

719+
// _poolSemaphore is introduced here: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/328
720+
// the MaybeClose can be called in different threads so we need to protect the pool
721+
// the pool itself is thread safe but we need to protect the flow to be sure that the
722+
// connection is released only once
723+
private readonly SemaphoreSlim _poolSemaphore = new(1, 1);
724+
682725
// Safe close
683-
// the client can be closed only if the publishers are == 0
684-
// not a public method used internally by producers and consumers
685-
internal async Task<CloseResponse> MaybeClose(string reason)
726+
// the client can be closed only if HasEntities is false
727+
// if the client has entities (publishers or consumers) it will be released from the pool
728+
// Release will decrement the active ids for the connection
729+
// if the active ids are 0 the connection will be closed
730+
731+
internal async Task<CloseResponse> MaybeClose(string reason, string stream, ConnectionsPool pool)
686732
{
687-
if (publishers.Count == 0 && consumers.Count == 0)
733+
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
734+
try
688735
{
689-
await Close(reason).ConfigureAwait(false);
690-
}
736+
if (!HasEntities())
737+
{
738+
if (!string.IsNullOrEmpty(ClientId))
739+
{
740+
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
741+
// the client can be closed in an unexpected way so we need to remove it from the pool
742+
// so you will find pool.remove(ClientId) also to the disconnect event
743+
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
744+
pool.Remove(ClientId);
745+
await Close(reason).ConfigureAwait(false);
746+
}
747+
}
748+
else
749+
{
750+
// we remove an id reference from the client
751+
// in case there are still active ids from the client and the stream
752+
if (!string.IsNullOrEmpty(ClientId))
753+
{
754+
pool.Release(ClientId, stream);
755+
}
756+
}
691757

692-
var result = new CloseResponse(0, ResponseCode.Ok);
693-
return result;
758+
var result = new CloseResponse(0, ResponseCode.Ok);
759+
return result;
760+
}
761+
finally
762+
{
763+
_poolSemaphore.Release();
764+
}
694765
}
695766

767+
public string ClientId { get; init; }
768+
696769
public async ValueTask<QueryPublisherResponse> QueryPublisherSequence(string publisherRef, string stream)
697770
{
698771
return await Request<QueryPublisherRequest, QueryPublisherResponse>(corr =>

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,12 @@ public CrcException(string s)
125125
{
126126
}
127127
}
128+
129+
public class TooManyConnectionsException : Exception
130+
{
131+
public TooManyConnectionsException(string s)
132+
: base(s)
133+
{
134+
}
135+
}
128136
}

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,14 @@ private async Task ProcessIncomingFrames()
145145
var buffer = result.Buffer;
146146
if (buffer.Length == 0)
147147
{
148-
Debug.WriteLine("TCP Connection Closed");
148+
Debug.WriteLine("TCP Connection Closed!");
149149
// We're not going to receive any more bytes from the connection.
150150
break;
151151
}
152152

153153
// Let's try to read some frames!
154154

155-
while (TryReadFrame(ref buffer, out var frame))
155+
while (TryReadFrame(ref buffer, out var frame) && !isClosed)
156156
{
157157
// Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled.
158158

0 commit comments

Comments
 (0)