Skip to content

Commit 0833ec2

Browse files
committed
Validate the pool is not null
and the values are valid. Add ProducerInfo to better understand the logs. Move dispose function to the abstract entity to remove code duplication. Better logs in case of errors. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7fbf1d7 commit 0833ec2

File tree

8 files changed

+117
-125
lines changed

8 files changed

+117
-125
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// 2.0, and the Mozilla Public License, version 2.0.
33
// Copyright (c) 2007-2023 VMware, Inc.
44

5+
using System;
56
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Logging;
69

710
namespace RabbitMQ.Stream.Client
811
{
@@ -29,6 +32,38 @@ protected void MaybeCancelToken()
2932
_cancelTokenSource.Cancel();
3033
}
3134

35+
public abstract Task<ResponseCode> Close();
36+
37+
protected void Dispose(bool disposing, string entityInfo, ILogger logger)
38+
{
39+
if (!disposing)
40+
{
41+
return;
42+
}
43+
44+
if (_status == EntityStatus.Disposed)
45+
{
46+
return;
47+
}
48+
49+
try
50+
{
51+
var closeTask = Close();
52+
if (!closeTask.Wait(Consts.MidWait))
53+
{
54+
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
55+
}
56+
}
57+
catch (Exception e)
58+
{
59+
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
60+
}
61+
finally
62+
{
63+
_status = EntityStatus.Disposed;
64+
}
65+
}
66+
3267
public bool IsOpen()
3368
{
3469
return _status == EntityStatus.Open;

RabbitMQ.Stream.Client/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -715,15 +715,15 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
715715
{
716716
if (!string.IsNullOrEmpty(ClientId))
717717
{
718-
_logger.LogInformation("Releasing connection {Connection}", ClientId);
718+
_logger.LogInformation("Releasing ids for the client id {ClientId}", ClientId);
719719
pool.Release(ClientId, stream);
720720
}
721721

722722
if (!HasEntities())
723723
{
724724
if (!string.IsNullOrEmpty(ClientId))
725725
{
726-
_logger.LogInformation("Close connection {Connection}", ClientId);
726+
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
727727
// pool.remove(ClientId) is a duplicate call here but it is ok
728728
// the client can be closed in an unexpected way so we need to remove it from the pool
729729
// so you will find pool.remove(ClientId) also to the disconnect event

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,6 @@ RabbitMQ.Stream.Client.QueryPublisherResponse.Sequence.get -> ulong
528528
RabbitMQ.Stream.Client.QueryPublisherResponse.SizeNeeded.get -> int
529529
RabbitMQ.Stream.Client.QueryPublisherResponse.Write(System.Span<byte> span) -> int
530530
RabbitMQ.Stream.Client.RawConsumer
531-
RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
532531
RabbitMQ.Stream.Client.RawConsumer.Dispose() -> void
533532
RabbitMQ.Stream.Client.RawConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
534533
RabbitMQ.Stream.Client.RawConsumerConfig
@@ -541,7 +540,6 @@ RabbitMQ.Stream.Client.RawConsumerConfig.OffsetSpec.set -> void
541540
RabbitMQ.Stream.Client.RawConsumerConfig.RawConsumerConfig(string stream) -> void
542541
RabbitMQ.Stream.Client.RawConsumerConfig.Stream.get -> string
543542
RabbitMQ.Stream.Client.RawProducer
544-
RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
545543
RabbitMQ.Stream.Client.RawProducer.ConfirmFrames.get -> int
546544
RabbitMQ.Stream.Client.RawProducer.Dispose() -> void
547545
RabbitMQ.Stream.Client.RawProducer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1+
abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
12
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
23
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
34
override RabbitMQ.Stream.Client.Broker.ToString() -> string
5+
override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6+
override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
7+
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing, string entityInfo, Microsoft.Extensions.Logging.ILogger logger) -> void
48
RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
59
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
610
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ private string ConsumerInfo()
127127
? "No SuperStream"
128128
: $"SuperStream {_config.SuperStream}";
129129
return
130-
$"Consumer for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
130+
$"Consumer id {_subscriberId} for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
131131
$"Client ProvidedName {_config.ClientProvidedName}, " +
132132
$"{superStream}, IsSingleActiveConsumer: {_config.IsSingleActiveConsumer}, " +
133133
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
@@ -562,7 +562,7 @@ private async Task Init()
562562
throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}");
563563
}
564564

565-
public async Task<ResponseCode> Close()
565+
public override async Task<ResponseCode> Close()
566566
{
567567
// this unlock the consumer if it is waiting for a message
568568
// see DispatchMessage method where the token is used
@@ -592,57 +592,23 @@ public async Task<ResponseCode> Close()
592592
catch (Exception e)
593593
{
594594
_logger.LogError(e,
595-
"Error removing the consumer id: {SubscriberId}, {ConsumerInfo} from the server",
596-
_subscriberId, ConsumerInfo());
595+
"Error removing {ConsumerInfo} from the server",
596+
ConsumerInfo());
597597
}
598598

599599
var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}",
600600
_config.Stream, _config.Pool)
601601
.ConfigureAwait(false);
602602
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}");
603603
_logger.LogDebug("{ConsumerInfo} is closed", ConsumerInfo());
604-
605604
return result;
606605
}
607606

608-
private void Dispose(bool disposing)
609-
{
610-
if (!disposing)
611-
{
612-
return;
613-
}
614-
615-
if (_status == EntityStatus.Disposed)
616-
{
617-
return;
618-
}
619-
620-
try
621-
{
622-
var closeConsumer = Close();
623-
if (!closeConsumer.Wait(Consts.ShortWait))
624-
{
625-
Debug.WriteLine($"consumer did not close within {Consts.ShortWait}");
626-
}
627-
628-
ClientExceptions.MaybeThrowException(closeConsumer.Result,
629-
$"Error during remove producer. {ConsumerInfo()}");
630-
}
631-
finally
632-
{
633-
_status = EntityStatus.Disposed;
634-
}
635-
}
636-
637607
public void Dispose()
638608
{
639609
try
640610
{
641-
Dispose(true);
642-
}
643-
catch (Exception e)
644-
{
645-
_logger.LogError(e, "Error during disposing of {ConsumerInfo}", ConsumerInfo());
611+
Dispose(true, ConsumerInfo(), _logger);
646612
}
647613
finally
648614
{

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,15 @@
44

55
using System;
66
using System.Collections.Generic;
7+
8+
/* Unmerged change from project 'RabbitMQ.Stream.Client(net7.0)'
9+
Before:
710
using System.Diagnostics;
811
using System.Linq;
12+
After:
13+
using System.Linq;
14+
*/
15+
using System.Linq;
916
using System.Runtime.CompilerServices;
1017
using System.Threading;
1118
using System.Threading.Channels;
@@ -59,6 +66,14 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable
5966

6067
public int PendingCount => _config.MaxInFlight - _semaphore.CurrentCount;
6168

69+
private string ProducerInfo()
70+
{
71+
return
72+
$"Producer id {_publisherId} for stream: {_config.Stream}, reference: {_config.Reference}," +
73+
$"Client ProvidedName {_config.ClientProvidedName}, " +
74+
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
75+
}
76+
6277
public static async Task<IProducer> Create(
6378
ClientParameters clientParameters,
6479
RawProducerConfig config,
@@ -138,8 +153,8 @@ private async Task Init()
138153
// there could be an exception in the user code.
139154
// So here we log the exception and we continue.
140155

141-
_logger.LogError(e, "Error during confirm handler, publishing id: {Id}. " +
142-
"Hint: Check the user ConfirmHandler callback", id);
156+
_logger.LogError(e, "Error during confirm handler, publishing id: {Id}. {ProducerInfo} " +
157+
"Hint: Check the user ConfirmHandler callback", id, ProducerInfo());
143158
}
144159
}
145160

@@ -333,7 +348,7 @@ private async Task ProcessBuffer()
333348
}
334349
}
335350

336-
public async Task<ResponseCode> Close()
351+
public override async Task<ResponseCode> Close()
337352
{
338353
// MaybeCancelToken This unlocks the semaphore so that the background task can exit
339354
// see SemaphoreAwaitAsync method and processBuffer method
@@ -358,52 +373,27 @@ public async Task<ResponseCode> Close()
358373
}
359374
catch (Exception e)
360375
{
361-
_logger.LogError(e, "Error removing the producer id: {PublisherId} from the server", _publisherId);
376+
_logger.LogError(e, "Error removing {ProducerInfo} from the server", ProducerInfo());
362377
}
363378

364379
var closed = await _client.MaybeClose($"client-close-publisher: {_publisherId}",
365380
_config.Stream, _config.Pool)
366381
.ConfigureAwait(false);
367382
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {_publisherId}");
368-
_logger?.LogDebug("Publisher {PublisherId} closed", _publisherId);
383+
_logger?.LogDebug("{ProducerInfo} closed", ProducerInfo());
369384
return result;
370385
}
371386

372-
private void Dispose(bool disposing)
373-
{
374-
if (!disposing)
375-
{
376-
return;
377-
}
378-
379-
if (_status == EntityStatus.Disposed)
380-
{
381-
return;
382-
}
383-
384-
var closeProducer = Close();
385-
if (!closeProducer.Wait(Consts.ShortWait))
386-
{
387-
Debug.WriteLine($"producer did not close within {Consts.ShortWait}");
388-
}
389-
390-
_status = EntityStatus.Disposed;
391-
ClientExceptions.MaybeThrowException(closeProducer.Result,
392-
$"Error during remove producer. Producer: {_publisherId}");
393-
}
394-
395387
public void Dispose()
396388
{
397389
try
398390
{
399-
Dispose(true);
391+
Dispose(true, ProducerInfo(), _logger);
400392
}
401-
catch (Exception e)
393+
finally
402394
{
403-
_logger.LogError(e, "Error during disposing Consumer: {PublisherId}", _publisherId);
395+
GC.SuppressFinalize(this);
404396
}
405-
406-
GC.SuppressFinalize(this);
407397
}
408398

409399
public ProducerInfo Info { get; }

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ internal void Validate()
4242
/// </summary>
4343
public SslOption Ssl { get; set; } = new();
4444

45-
public IList<EndPoint> Endpoints { get; set; } = new List<EndPoint> {new IPEndPoint(IPAddress.Loopback, 5552)};
45+
public IList<EndPoint> Endpoints { get; set; } = new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };
4646

4747
public AddressResolver AddressResolver { get; set; }
4848
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";
@@ -95,7 +95,7 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
9595
{
9696
try
9797
{
98-
var client = await Client.Create(clientParams with {Endpoint = endPoint}, logger)
98+
var client = await Client.Create(clientParams with { Endpoint = endPoint }, logger)
9999
.ConfigureAwait(false);
100100
if (!client.IsClosed)
101101
{
@@ -202,7 +202,7 @@ public async Task<IProducer> CreateRawSuperStreamProducer(
202202

203203
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
204204
streamInfos,
205-
_clientParameters with {ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName},
205+
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
206206
logger);
207207
_logger?.LogDebug("Raw Producer: {ProducerReference} created for SuperStream: {SuperStream}",
208208
rawSuperStreamProducerConfig.Reference,
@@ -253,7 +253,7 @@ public async Task<IConsumer> CreateSuperStreamConsumer(
253253

254254
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
255255
streamInfos,
256-
_clientParameters with {ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName},
256+
_clientParameters with { ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName },
257257
logger);
258258
_logger?.LogDebug("Consumer: {Reference} created for SuperStream: {SuperStream}",
259259
rawSuperStreamConsumerConfig.Reference, rawSuperStreamConsumerConfig.SuperStream);
@@ -282,7 +282,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
282282
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
283283
rawProducerConfig.Pool = PoolProducers;
284284
var p = await RawProducer.Create(
285-
_clientParameters with {ClientProvidedName = rawProducerConfig.ClientProvidedName},
285+
_clientParameters with { ClientProvidedName = rawProducerConfig.ClientProvidedName },
286286
rawProducerConfig, metaStreamInfo, logger).ConfigureAwait(false);
287287
_logger?.LogDebug("Raw Producer: {Reference} created for Stream: {Stream}",
288288
rawProducerConfig.Reference, rawProducerConfig.Stream);
@@ -307,7 +307,7 @@ private async Task<StreamInfo> StreamInfo(string streamName)
307307
var clientParametersEndpoint = _clientParameters.Endpoints[0];
308308
switch (clientParametersEndpoint)
309309
{
310-
case DnsEndPoint {Host: "localhost"} dnsEndPoint:
310+
case DnsEndPoint { Host: "localhost" } dnsEndPoint:
311311
forceLocalHost = true;
312312
localPort = dnsEndPoint.Port;
313313
break;
@@ -324,12 +324,12 @@ private async Task<StreamInfo> StreamInfo(string streamName)
324324
// craft the metadata response to force using localhost
325325
var leader = new Broker("localhost", (uint)localPort);
326326
metaStreamInfo = new StreamInfo(streamName, ResponseCode.Ok, leader,
327-
new List<Broker>(1) {leader});
327+
new List<Broker>(1) { leader });
328328
}
329329
else
330330
{
331331
await MayBeReconnectLocator().ConfigureAwait(false);
332-
var meta = await _client.QueryMetadata(new[] {streamName}).ConfigureAwait(false);
332+
var meta = await _client.QueryMetadata(new[] { streamName }).ConfigureAwait(false);
333333
metaStreamInfo = meta.StreamInfos[streamName];
334334
}
335335

@@ -430,7 +430,7 @@ public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConf
430430
{
431431
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
432432
rawConsumerConfig.Pool = PoolConsumers;
433-
var s = _clientParameters with {ClientProvidedName = rawConsumerConfig.ClientProvidedName};
433+
var s = _clientParameters with { ClientProvidedName = rawConsumerConfig.ClientProvidedName };
434434
var c = await RawConsumer.Create(s,
435435
rawConsumerConfig, metaStreamInfo, logger).ConfigureAwait(false);
436436
_logger?.LogDebug("Raw Consumer: {Reference} created for Stream: {Stream}",

0 commit comments

Comments
 (0)