Skip to content

Commit 7fbf1d7

Browse files
committed
Validate the pool is not null
and the values are valid. Add ProducerInfo to better understand the logs. Move dispose function to the abstract entity to remove code duplication. Better logs in case of errors. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 5c7c00d commit 7fbf1d7

File tree

2 files changed

+92
-47
lines changed

2 files changed

+92
-47
lines changed

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,24 @@ namespace RabbitMQ.Stream.Client
1414
{
1515
public record StreamSystemConfig : INamedEntity
1616
{
17+
internal void Validate()
18+
{
19+
if (ConnectionPoolConfig is null)
20+
{
21+
throw new ArgumentException("ConnectionPoolConfig can't be null");
22+
}
23+
24+
if (ConnectionPoolConfig.MaxConsumersConnections < 0)
25+
{
26+
throw new ArgumentException("MaxConsumersConnections can't be negative");
27+
}
28+
29+
if (ConnectionPoolConfig.MaxProducersConnections < 0)
30+
{
31+
throw new ArgumentException("MaxProducersConnections can't be negative");
32+
}
33+
}
34+
1735
public string UserName { get; set; } = "guest";
1836
public string Password { get; set; } = "guest";
1937
public string VirtualHost { get; set; } = "/";
@@ -24,7 +42,7 @@ public record StreamSystemConfig : INamedEntity
2442
/// </summary>
2543
public SslOption Ssl { get; set; } = new();
2644

27-
public IList<EndPoint> Endpoints { get; set; } = new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };
45+
public IList<EndPoint> Endpoints { get; set; } = new List<EndPoint> {new IPEndPoint(IPAddress.Loopback, 5552)};
2846

2947
public AddressResolver AddressResolver { get; set; }
3048
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";
@@ -59,6 +77,7 @@ private StreamSystem(ClientParameters clientParameters, Client client,
5977

6078
public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger<StreamSystem> logger = null)
6179
{
80+
config.Validate();
6281
var clientParams = new ClientParameters
6382
{
6483
UserName = config.UserName,
@@ -76,7 +95,7 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
7695
{
7796
try
7897
{
79-
var client = await Client.Create(clientParams with { Endpoint = endPoint }, logger)
98+
var client = await Client.Create(clientParams with {Endpoint = endPoint}, logger)
8099
.ConfigureAwait(false);
81100
if (!client.IsClosed)
82101
{
@@ -183,7 +202,7 @@ public async Task<IProducer> CreateRawSuperStreamProducer(
183202

184203
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
185204
streamInfos,
186-
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
205+
_clientParameters with {ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName},
187206
logger);
188207
_logger?.LogDebug("Raw Producer: {ProducerReference} created for SuperStream: {SuperStream}",
189208
rawSuperStreamProducerConfig.Reference,
@@ -234,7 +253,7 @@ public async Task<IConsumer> CreateSuperStreamConsumer(
234253

235254
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
236255
streamInfos,
237-
_clientParameters with { ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName },
256+
_clientParameters with {ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName},
238257
logger);
239258
_logger?.LogDebug("Consumer: {Reference} created for SuperStream: {SuperStream}",
240259
rawSuperStreamConsumerConfig.Reference, rawSuperStreamConsumerConfig.SuperStream);
@@ -263,7 +282,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
263282
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
264283
rawProducerConfig.Pool = PoolProducers;
265284
var p = await RawProducer.Create(
266-
_clientParameters with { ClientProvidedName = rawProducerConfig.ClientProvidedName },
285+
_clientParameters with {ClientProvidedName = rawProducerConfig.ClientProvidedName},
267286
rawProducerConfig, metaStreamInfo, logger).ConfigureAwait(false);
268287
_logger?.LogDebug("Raw Producer: {Reference} created for Stream: {Stream}",
269288
rawProducerConfig.Reference, rawProducerConfig.Stream);
@@ -288,7 +307,7 @@ private async Task<StreamInfo> StreamInfo(string streamName)
288307
var clientParametersEndpoint = _clientParameters.Endpoints[0];
289308
switch (clientParametersEndpoint)
290309
{
291-
case DnsEndPoint { Host: "localhost" } dnsEndPoint:
310+
case DnsEndPoint {Host: "localhost"} dnsEndPoint:
292311
forceLocalHost = true;
293312
localPort = dnsEndPoint.Port;
294313
break;
@@ -305,12 +324,12 @@ private async Task<StreamInfo> StreamInfo(string streamName)
305324
// craft the metadata response to force using localhost
306325
var leader = new Broker("localhost", (uint)localPort);
307326
metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader,
308-
new List<Broker>(1) { leader });
327+
new List<Broker>(1) {leader});
309328
}
310329
else
311330
{
312331
await MayBeReconnectLocator().ConfigureAwait(false);
313-
var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false);
332+
var meta = await _client.QueryMetadata(new[] {streamName}).ConfigureAwait(false);
314333
metaStreamInfo = meta.StreamInfos[streamName];
315334
}
316335

@@ -411,7 +430,7 @@ public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConf
411430
{
412431
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
413432
rawConsumerConfig.Pool = PoolConsumers;
414-
var s = _clientParameters with { ClientProvidedName = rawConsumerConfig.ClientProvidedName };
433+
var s = _clientParameters with {ClientProvidedName = rawConsumerConfig.ClientProvidedName};
415434
var c = await RawConsumer.Create(s,
416435
rawConsumerConfig, metaStreamInfo, logger).ConfigureAwait(false);
417436
_logger?.LogDebug("Raw Consumer: {Reference} created for Stream: {Stream}",

0 commit comments

Comments
 (0)