Skip to content

Commit fff144d

Browse files
committed
Add tests to validate the internal pool lists
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent f7a0573 commit fff144d

File tree

8 files changed

+159
-30
lines changed

8 files changed

+159
-30
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public bool IsOpen()
3434
return _status == EntityStatus.Open;
3535
}
3636

37-
protected Client _client;
37+
internal Client _client;
3838

3939
}
4040
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
300300
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
301301
try
302302
{
303-
var publisherId = ConnectionsPool.FindMissingConsecutive(publishers.Keys.ToList());
303+
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
304304
publishers.Add(publisherId, (confirmCallback, errorCallback));
305305
return (publisherId, await Request<DeclarePublisherRequest, DeclarePublisherResponse>(corr =>
306306
new DeclarePublisherRequest(corr, publisherId, publisherRef, stream)).ConfigureAwait(false));
@@ -349,7 +349,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
349349
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
350350
try
351351
{
352-
var subscriptionId = ConnectionsPool.FindMissingConsecutive(consumers.Keys.ToList());
352+
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
353353
consumers.Add(subscriptionId,
354354
new ConsumerEvents(
355355
deliverHandler,

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ namespace RabbitMQ.Stream.Client;
1313

1414
public class ConnectionPoolConfig
1515
{
16-
public int MaxConnections { get; set; } = 0;
16+
public int MaxProducersConnections { get; set; } = 0;
17+
public int MaxConsumersConnections { get; set; } = 0;
1718
public byte ConsumersPerConnection { get; set; } = 1;
1819
public byte ProducersPerConnection { get; set; } = 1;
1920
}
@@ -86,7 +87,7 @@ public class ConnectionsPool
8687
{
8788
private static readonly object s_lock = new();
8889

89-
internal static byte FindMissingConsecutive(List<byte> ids)
90+
internal static byte FindNextValidId(List<byte> ids)
9091
{
9192
lock (s_lock)
9293
{
@@ -95,12 +96,17 @@ internal static byte FindMissingConsecutive(List<byte> ids)
9596
return 0;
9697
}
9798

99+
// we start with the recycle when we reach the max value
100+
// in this way we can avoid to recycle the same ids in a short time
98101
ids.Sort();
99-
for (var i = 0; i < ids.Count - 1; i++)
102+
if (ids[^1] == byte.MaxValue)
100103
{
101-
if (ids[i + 1] - ids[i] > 1)
104+
for (var i = 0; i < ids.Count - 1; i++)
102105
{
103-
return (byte)(ids[i] + 1);
106+
if (ids[i + 1] - ids[i] > 1)
107+
{
108+
return (byte)(ids[i] + 1);
109+
}
104110
}
105111
}
106112

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ public interface IConsumer
1818

1919
public record IConsumerConfig : INamedEntity
2020
{
21-
2221
private ushort _initialCredits = Consts.ConsumerInitialCredits;
2322

2423
internal ConnectionsPool Pool { get; set; }
@@ -80,6 +79,7 @@ public ushort InitialCredits
8079
public class ConsumerInfo : Info
8180
{
8281
public string Reference { get; }
82+
8383
public ConsumerInfo(string stream, string reference) : base(stream)
8484
{
8585
Reference = reference;

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ override RabbitMQ.Stream.Client.Reliable.Producer.Close() -> System.Threading.Ta
7979
override RabbitMQ.Stream.Client.Reliable.Producer.ToString() -> string
8080
RabbitMQ.Stream.Client.AbstractEntity
8181
RabbitMQ.Stream.Client.AbstractEntity.AbstractEntity() -> void
82-
RabbitMQ.Stream.Client.AbstractEntity._client -> RabbitMQ.Stream.Client.Client
8382
RabbitMQ.Stream.Client.AddressResolver
8483
RabbitMQ.Stream.Client.AddressResolver.AddressResolver(System.Net.EndPoint endPoint) -> void
8584
RabbitMQ.Stream.Client.AddressResolver.Enabled.get -> bool

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ RabbitMQ.Stream.Client.ConnectionPoolConfig
3232
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConnectionPoolConfig() -> void
3333
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.get -> byte
3434
RabbitMQ.Stream.Client.ConnectionPoolConfig.ConsumersPerConnection.set -> void
35-
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxConnections.get -> int
36-
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxConnections.set -> void
35+
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxConsumersConnections.get -> int
36+
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxConsumersConnections.set -> void
37+
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxProducersConnections.get -> int
38+
RabbitMQ.Stream.Client.ConnectionPoolConfig.MaxProducersConnections.set -> void
3739
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.get -> byte
3840
RabbitMQ.Stream.Client.ConnectionPoolConfig.ProducersPerConnection.set -> void
3941
RabbitMQ.Stream.Client.ConnectionsPool

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ private StreamSystem(ClientParameters clientParameters, Client client,
4949
_clientParameters = clientParameters;
5050
_client = client;
5151
_logger = logger ?? NullLogger<StreamSystem>.Instance;
52-
PoolConsumers = new ConnectionsPool(connectionPoolConfig.MaxConnections / 2,
52+
PoolConsumers = new ConnectionsPool(connectionPoolConfig.MaxConsumersConnections,
5353
connectionPoolConfig.ConsumersPerConnection);
54-
PoolProducers = new ConnectionsPool(connectionPoolConfig.MaxConnections / 2,
54+
PoolProducers = new ConnectionsPool(connectionPoolConfig.MaxProducersConnections,
5555
connectionPoolConfig.ProducersPerConnection);
5656
}
5757

Tests/ConnectionsPoolTests.cs

Lines changed: 138 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ private static Task<IClient> CreateClient(ClientParameters clientParameters)
2525
return Task.FromResult<IClient>(fake);
2626
}
2727

28+
private static IEnumerable<byte> ConsumersIdsPerConnection(IConsumer consumer)
29+
{
30+
var client1 = ((RawConsumer)consumer)._client;
31+
return client1.consumers.Keys.ToList();
32+
}
33+
34+
private static IEnumerable<byte> ProducersIdsPerConnection(IProducer producer)
35+
{
36+
var client1 = ((RawProducer)producer)._client;
37+
return client1.publishers.Keys.ToList();
38+
}
39+
2840
private readonly ITestOutputHelper _testOutputHelper;
2941

3042
public ConnectionsPoolTests(ITestOutputHelper testOutputHelper)
@@ -322,6 +334,7 @@ public async void TwoConsumersShouldShareTheSameConnectionFromThePool()
322334

323335
await client.DeleteStream(Stream1);
324336
await client.DeleteStream(Stream2);
337+
await client.Close("byte");
325338
}
326339

327340
/// <summary>
@@ -364,6 +377,7 @@ public async void TwoProducersShouldShareTheSameConnectionFromThePool()
364377

365378
await client.DeleteStream(Stream1);
366379
await client.DeleteStream(Stream2);
380+
await client.Close("byte");
367381
}
368382

369383
/// <summary>
@@ -391,19 +405,34 @@ public async void TwoProducerAndConsumerShouldHaveDifferentConnection()
391405
Assert.Equal(1, pool.ActiveIdsCountForStream(Stream1));
392406
Assert.Equal(1, pool.ActiveIdsCountForStream(Stream2));
393407

408+
Assert.NotEmpty(ProducersIdsPerConnection(p2).ToList());
409+
Assert.Equal(0, ProducersIdsPerConnection(p2).ToList()[0]);
410+
411+
Assert.NotEmpty(ConsumersIdsPerConnection(c1).ToList());
412+
Assert.Equal(0, ConsumersIdsPerConnection(c1).ToList()[0]);
413+
394414
Assert.Equal(ResponseCode.Ok, await c1.Close());
395415
// closing should be idempotent and not affect to the pool
396416
Assert.Equal(ResponseCode.Ok, await c1.Close());
397417

398418
Assert.Equal(1, pool.ConnectionsCount);
399419
Assert.Equal(0, pool.ActiveIdsCountForStream(Stream1));
400420

421+
Assert.NotEmpty(ProducersIdsPerConnection(p2).ToList());
422+
Assert.Equal(0, ProducersIdsPerConnection(p2).ToList()[0]);
423+
424+
Assert.Empty(ConsumersIdsPerConnection(c1).ToList());
425+
401426
await p2.Close();
402427
Assert.Equal(0, pool.ConnectionsCount);
403428
Assert.Equal(0, pool.ActiveIdsCountForStream(Stream2));
404429

430+
Assert.Empty(ProducersIdsPerConnection(p2).ToList());
431+
Assert.Empty(ConsumersIdsPerConnection(c1).ToList());
432+
405433
await client.DeleteStream(Stream1);
406434
await client.DeleteStream(Stream2);
435+
await client.Close("byte");
407436
}
408437

409438
/// <summary>
@@ -470,6 +499,7 @@ await RawConsumer.Create(client.Parameters,
470499
Assert.Equal(msgDataStream2.Contents.ToArray(), testPassedC2.Task.Result.Contents.ToArray());
471500
await client.DeleteStream(Stream1);
472501
await client.DeleteStream(Stream2);
502+
await client.Close("byte");
473503
}
474504

475505
/// <summary>
@@ -496,13 +526,17 @@ await Assert.ThrowsAsync<AggregateException>(async () => await RawConsumer.Creat
496526
Assert.Equal(1, pool.ConnectionsCount);
497527
Assert.Equal(1, pool.ActiveIdsCountForStream(Stream2));
498528
Assert.Equal(1, pool.ActiveIdsCount);
529+
Assert.NotEmpty(ProducersIdsPerConnection(p1).ToList());
530+
Assert.Equal(0, ProducersIdsPerConnection(p1).ToList()[0]);
499531

500532
await p1.Close();
501533
Assert.Equal(0, pool.ConnectionsCount);
502534
Assert.Equal(0, pool.ActiveIdsCount);
535+
Assert.Empty(ProducersIdsPerConnection(p1).ToList());
503536

504537
await client.DeleteStream(Stream1);
505538
await client.DeleteStream(Stream2);
539+
await client.Close("byte");
506540
}
507541

508542
/// <summary>
@@ -524,7 +558,7 @@ public async void TheProducerPoolShouldBeConsistentInMultiThread()
524558
var producerList = new ConcurrentDictionary<string, IProducer>();
525559

526560
var tasksP = new List<Task>();
527-
for (var i = 0; i < (IdsPerConnection * 2); i++)
561+
for (var i = 0; i < (IdsPerConnection); i++)
528562
{
529563
tasksP.Add(Task.Run(async () =>
530564
{
@@ -536,6 +570,21 @@ public async void TheProducerPoolShouldBeConsistentInMultiThread()
536570

537571
await Task.WhenAll(tasksP);
538572

573+
producerList.Values.ToList().ForEach(p => Assert.Equal(IdsPerConnection, ProducersIdsPerConnection(p).Count()));
574+
575+
for (var i = 0; i < (IdsPerConnection); i++)
576+
{
577+
tasksP.Add(Task.Run(async () =>
578+
{
579+
var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
580+
metaDataInfo.StreamInfos[Stream1]);
581+
producerList.TryAdd(Guid.NewGuid().ToString(), p);
582+
}));
583+
}
584+
585+
await Task.WhenAll(tasksP);
586+
producerList.Values.ToList().ForEach(p => Assert.Equal(IdsPerConnection, ProducersIdsPerConnection(p).Count()));
587+
539588
Assert.Equal(2, pool.ConnectionsCount);
540589
Assert.Equal(IdsPerConnection * 2, pool.ActiveIdsCountForStream(Stream1));
541590

@@ -549,6 +598,7 @@ public async void TheProducerPoolShouldBeConsistentInMultiThread()
549598
Assert.Equal(0, pool.ConnectionsCount);
550599
Assert.Equal(0, pool.ActiveIdsCount);
551600
await client.DeleteStream(Stream1);
601+
await client.Close("byte");
552602
}
553603

554604
/// <summary>
@@ -592,6 +642,7 @@ public async void TheProducerConsumerPoolShouldBeConsistentInMultiThreadCreateDe
592642
Assert.Equal(0, pool.ConnectionsCount);
593643
Assert.Equal(0, pool.ActiveIdsCount);
594644
await client.DeleteStream(Stream1);
645+
await client.Close("byte");
595646
}
596647

597648
// // this test doesn't work since the client parameters metadata handler is not an event
@@ -681,33 +732,97 @@ await RawConsumer.Create(client.Parameters,
681732
Assert.Equal(0, pool.ActiveIdsCount);
682733
Assert.Equal(0, pool.ConnectionsCount);
683734
await client.DeleteStream(Stream1);
735+
await client.Close("byte");
736+
}
737+
738+
[Fact]
739+
public async void TheConsumerPoolShouldBeConsistentWhenTheConnectionIsClosed()
740+
{
741+
var clientProvidedName = Guid.NewGuid().ToString();
742+
var client = await Client.Create(new ClientParameters() { ClientProvidedName = clientProvidedName });
743+
const string Stream1 = "pool_test_stream_1_test_connection_closed";
744+
const string Stream2 = "pool_test_stream_2_test_connection_closed";
745+
await client.CreateStream(Stream1, new Dictionary<string, string>());
746+
await client.CreateStream(Stream2, new Dictionary<string, string>());
747+
const int IdsPerConnection = 2;
748+
var pool = new ConnectionsPool(0, IdsPerConnection);
749+
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
750+
751+
var c1 = await RawConsumer.Create(client.Parameters,
752+
new RawConsumerConfig(Stream1) { Pool = pool },
753+
metaDataInfo.StreamInfos[Stream1]);
754+
755+
var c2 = await RawConsumer.Create(client.Parameters,
756+
new RawConsumerConfig(Stream2) { Pool = pool },
757+
metaDataInfo.StreamInfos[Stream2]);
758+
759+
Assert.Equal(1, pool.ConnectionsCount);
760+
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProvidedName).Result == 2);
761+
SystemUtils.WaitUntil(() => pool.ConnectionsCount == 0);
762+
Assert.Equal(0, pool.ConnectionsCount);
763+
Assert.Equal(0, pool.ActiveIdsCount);
764+
Assert.Equal(0, pool.ActiveIdsCountForStream(Stream1));
765+
Assert.Equal(0, pool.ActiveIdsCountForStream(Stream2));
766+
SystemUtils.Wait(); // the event close is raised in another thread so we need to wait a bit to be sure the event is raised
767+
Assert.Empty(ConsumersIdsPerConnection(c1).ToList());
768+
Assert.Empty(ConsumersIdsPerConnection(c2).ToList());
769+
770+
client = await Client.Create(new ClientParameters());
771+
await client.DeleteStream(Stream1);
772+
await client.DeleteStream(Stream2);
773+
await client.Close("bye");
684774
}
685775

686776
/// The following tests are related to the FindMissingConsecutive method
687777
/// We need to find the missing consecutive ids
688778
/// by protocol we can have multi ids per connection so we need to find the missing ids
689779
/// in case one id is released from the pool
690780
/// if we start with 0,1,2,3,4,5,6,7,8,9 at some point we release the id 3
691-
/// the next id should be 3
692-
/// the id is a byte so we can have 0-255
781+
/// the nextid it will be still 10
782+
/// The FindNextValidId function will start to reuse the missing ids when the max is reached
783+
/// In this way we can reduce the time to use the same ids
693784
[Fact]
694-
public void FindMissingConsecutiveShouldReturnZeroGivenEmptyList()
785+
public void FindNextValidIdShouldReturnZeroGivenEmptyList()
695786
{
696787
var ids = new List<byte>();
697-
var missing = ConnectionsPool.FindMissingConsecutive(ids);
788+
var missing = ConnectionsPool.FindNextValidId(ids);
698789
Assert.Equal(0, missing);
699790
}
700791

701792
[Fact]
702-
public void FindMissingConsecutiveShouldReturnOneGivenOneItem()
793+
public void FindNextValidIdShouldReturnOne()
703794
{
704795
var ids = new List<byte>() { 0 };
705-
var missing = ConnectionsPool.FindMissingConsecutive(ids);
796+
var missing = ConnectionsPool.FindNextValidId(ids);
706797
Assert.Equal(1, missing);
707798
}
708799

800+
// even there are missing ids the next valid id is the next one
801+
[Fact]
802+
public void FindNextValidShouldReturnTreeGivenAList()
803+
{
804+
var ids = new List<byte>()
805+
{
806+
0,
807+
1,
808+
2,
809+
4,
810+
6,
811+
8,
812+
9
813+
};
814+
var nextValidId = ConnectionsPool.FindNextValidId(ids);
815+
Assert.Equal(10, nextValidId);
816+
ids.Add(10);
817+
nextValidId = ConnectionsPool.FindNextValidId(ids);
818+
Assert.Equal(11, nextValidId);
819+
ids.Add(11);
820+
}
821+
822+
// in this case we start to recycle the ids
823+
// since the max is reached
709824
[Fact]
710-
public void FindMissingConsecutiveShouldReturnTreeGivenAList()
825+
public void RecycleIdsWhenTheMaxIsReached()
711826
{
712827
var ids = new List<byte>()
713828
{
@@ -722,19 +837,26 @@ public void FindMissingConsecutiveShouldReturnTreeGivenAList()
722837
8,
723838
9
724839
};
725-
var missing = ConnectionsPool.FindMissingConsecutive(ids);
726-
Assert.Equal(3, missing);
840+
for (byte i = 10; i < byte.MaxValue; i++)
841+
{
842+
ids.Add(i);
843+
}
844+
845+
var nextValidId = ConnectionsPool.FindNextValidId(ids);
846+
Assert.Equal(255, nextValidId);
847+
ids.Add(255);
848+
849+
nextValidId = ConnectionsPool.FindNextValidId(ids);
850+
Assert.Equal(3, nextValidId);
727851
ids.Add(3);
728-
missing = ConnectionsPool.FindMissingConsecutive(ids);
729-
Assert.Equal(5, missing);
730852

853+
nextValidId = ConnectionsPool.FindNextValidId(ids);
854+
Assert.Equal(5, nextValidId);
731855
ids.Add(5);
732-
missing = ConnectionsPool.FindMissingConsecutive(ids);
733-
Assert.Equal(7, missing);
734856

857+
nextValidId = ConnectionsPool.FindNextValidId(ids);
858+
Assert.Equal(7, nextValidId);
735859
ids.Add(7);
736-
missing = ConnectionsPool.FindMissingConsecutive(ids);
737-
Assert.Equal(10, missing);
738860
}
739861
}
740862
}

0 commit comments

Comments
 (0)