Skip to content

Add Producer/Consumer Identifier #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
public Func<MetaDataUpdate, Task> MetadataHandler { get; set; }

/// <summary>
/// The Identifier does not have any effect on the server.
/// It is used to identify the entity in the logs and on the UI (only for the consumer)
/// It is possible to retrieve the entity info using the Info.Identifier method form the
/// raw* instances.
/// </summary>
public string Identifier { get; set; }
}

internal enum EntityStatus
Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ namespace RabbitMQ.Stream.Client;

public abstract class Info
{
public string Identifier { get; }
public string Stream { get; }

protected Info(string stream)
protected Info(string stream, string identifier)
{
Stream = stream;
Identifier = identifier;
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ public class ConsumerInfo : Info
{
public string Reference { get; }

public ConsumerInfo(string stream, string reference) : base(stream)
public ConsumerInfo(string stream, string reference, string identifier) : base(stream, identifier)
{
Reference = reference;
}

public override string ToString()
{
return $"{base.ToString()}, Reference: {Reference}";
return $"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier})";
}
}
7 changes: 6 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,13 @@ public class ProducerInfo : Info
{
public string Reference { get; }

public ProducerInfo(string stream, string reference) : base(stream)
public ProducerInfo(string stream, string reference, string identifier) : base(stream, identifier)
{
Reference = reference;
}

public override string ToString()
{
return $"ProducerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier})";
}
}
13 changes: 9 additions & 4 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
override RabbitMQ.Stream.Client.Broker.ToString() -> string
override RabbitMQ.Stream.Client.ConsumerInfo.ToString() -> string
override RabbitMQ.Stream.Client.ProducerInfo.ToString() -> string
override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing) -> void
Expand Down Expand Up @@ -80,7 +81,7 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
Expand All @@ -92,6 +93,8 @@ RabbitMQ.Stream.Client.CreateException.ResponseCode.get -> RabbitMQ.Stream.Clien
RabbitMQ.Stream.Client.CreateException.ResponseCode.init -> void
RabbitMQ.Stream.Client.CreateProducerException.CreateProducerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
RabbitMQ.Stream.Client.EntityCommonConfig
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand Down Expand Up @@ -133,7 +136,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
RabbitMQ.Stream.Client.Info.Identifier.get -> string
RabbitMQ.Stream.Client.Info.Info(string stream, string identifier) -> void
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
Expand All @@ -154,7 +158,7 @@ RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference, string identifier) -> void
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
Expand Down Expand Up @@ -205,9 +209,10 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.set -> void
Expand Down
11 changes: 9 additions & 2 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ protected sealed override string DumpEntityConfiguration()
? "No SuperStream"
: $"SuperStream {_config.SuperStream}";
return
$"Consumer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
$"Consumer id {EntityId} for stream: {_config.Stream}, " +
$"identifier: {_config.Identifier}, " +
$"reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
$"Client ProvidedName {_config.ClientProvidedName}, " +
$"{superStream}, IsSingleActiveConsumer: {_config.IsSingleActiveConsumer}, " +
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
Expand All @@ -144,7 +146,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
Info = new ConsumerInfo(_config.Stream, _config.Reference);
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down Expand Up @@ -483,6 +485,11 @@ private async Task Init()
consumerProperties["name"] = _config.Reference;
}

if (!string.IsNullOrEmpty(_config.Identifier))
{
consumerProperties["identifier"] = _config.Identifier;
}

if (_config.IsFiltering)
{
var i = 0;
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable
protected sealed override string DumpEntityConfiguration()
{
return
$"Producer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}," +
$"Producer id {EntityId} for stream: {_config.Stream}, " +
$"identifier: {_config.Identifier}" +
$"reference: {_config.Reference}," +
$"Client ProvidedName {_config.ClientProvidedName}, " +
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
}
Expand All @@ -87,7 +89,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
{
_client = client;
_config = config;
Info = new ProducerInfo(_config.Stream, _config.Reference);
Info = new ProducerInfo(_config.Stream, _config.Reference, config.Identifier);
Logger = logger ?? NullLogger.Instance;
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
Expand Down
7 changes: 4 additions & 3 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private RawSuperStreamConsumer(
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
Info = new ConsumerInfo(_config.SuperStream, _config.Reference, config.Identifier);

StartConsumers().Wait(CancellationToken.None);
}
Expand All @@ -74,6 +74,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
ConsumerFilter = _config.ConsumerFilter,
Pool = _config.Pool,
Crc32 = _config.Crc32,
Identifier = _config.Identifier,
ConnectionClosedHandler = async (reason) =>
{
_consumers.TryRemove(stream, out var consumer);
Expand Down Expand Up @@ -199,8 +200,8 @@ public void Dispose()
{
foreach (var stream in _consumers.Keys)
{
_consumers.TryRemove(stream, out var consumer);
consumer?.Close();
_consumers.TryGetValue(stream, out var consumer);
consumer?.Dispose();
}

_disposed = true;
Expand Down
3 changes: 2 additions & 1 deletion RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private RawSuperStreamProducer(
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new ProducerInfo(config.SuperStream, config.Reference);
Info = new ProducerInfo(config.SuperStream, config.Reference, config.Identifier);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand All @@ -92,6 +92,7 @@ private RawProducerConfig FromStreamConfig(string stream)
MaxInFlight = _config.MaxInFlight,
Filter = _config.Filter,
Pool = _config.Pool,
Identifier = _config.Identifier,
ConnectionClosedHandler = async (reason) =>
{
_producers.TryGetValue(stream, out var producer);
Expand Down
6 changes: 4 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference, consumerConfig.Identifier);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down Expand Up @@ -215,7 +215,9 @@ public override async Task Close()

public override string ToString()
{
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream}, " +
return $"Consumer reference: {_consumerConfig.Reference}, " +
$"stream: {_consumerConfig.Stream}, " +
$"identifier: {_consumerConfig.Identifier}, " +
$"client name: {_consumerConfig.ClientProvidedName} ";
}

Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
OffsetSpec = offsetSpec,
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
Identifier = _consumerConfig.Identifier,
ConnectionClosedHandler = async (closeReason) =>
{
if (closeReason == ConnectionClosedReason.Normal)
Expand Down Expand Up @@ -120,6 +121,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
ConsumerFilter = _consumerConfig.Filter,
Crc32 = _consumerConfig.Crc32,
OffsetSpec = offsetSpecs,
Identifier = _consumerConfig.Identifier,
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
await RandomWait().ConfigureAwait(false);
Expand Down
7 changes: 4 additions & 3 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSys
/// </summary>
public class Producer : ProducerFactory
{

private ulong _publishingId;
private readonly ILogger<Producer> _logger;

Expand All @@ -138,7 +137,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
producerConfig.TimeoutMessageAfter,
producerConfig.MaxInFlight
);
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference);
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference, producerConfig.Identifier);
_logger = logger ?? NullLogger<Producer>.Instance;
}

Expand Down Expand Up @@ -336,7 +335,9 @@ public async ValueTask Send(List<Message> messages, CompressionType compressionT

public override string ToString()
{
return $"Producer stream: {_producerConfig.Stream}, client name: {_producerConfig.ClientProvidedName}";
return $"Producer stream: {_producerConfig.Stream}, " +
$"identifier: {_producerConfig.Identifier}, " +
$"client name: {_producerConfig.ClientProvidedName}";
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
Routing = _producerConfig.SuperStreamConfig.Routing,
RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType,
Filter = _producerConfig.Filter,
Identifier = _producerConfig.Identifier,
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
await RandomWait().ConfigureAwait(false);
Expand Down Expand Up @@ -99,6 +100,7 @@ private async Task<IProducer> StandardProducer()
Reference = _producerConfig.Reference,
MaxInFlight = _producerConfig.MaxInFlight,
Filter = _producerConfig.Filter,
Identifier = _producerConfig.Identifier,
MetadataHandler = async _ =>
{
await RandomWait().ConfigureAwait(false);
Expand Down
31 changes: 26 additions & 5 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,27 @@ namespace RabbitMQ.Stream.Client.Reliable;

public record ReliableConfig
{
/// <summary>
/// The interface to reconnect the entity to the server.
/// By default it uses a BackOff pattern. See <see cref="BackOffReconnectStrategy"/>
/// </summary>
public IReconnectStrategy ReconnectStrategy { get; set; }

/// <summary>
/// The interface to check if the resource is available.
/// A stream could be not fully ready during the restarting.
/// By default it uses a BackOff pattern. See <see cref="ResourceAvailableBackOffReconnectStrategy"/>
/// </summary>
public IReconnectStrategy ResourceAvailableReconnectStrategy { get; set; }

/// <summary>
/// The Identifier does not have any effect on the server.
/// It is used to identify the entity in the logs and on the UI (only for the consumer)
/// It is possible to retrieve the entity info using the Info.Identifier method form the
/// Producer/Consumer instances.
/// </summary>
public string Identifier { get; set; }

public StreamSystem StreamSystem { get; }
public string Stream { get; }

Expand All @@ -31,12 +49,15 @@ protected ReliableConfig(StreamSystem streamSystem, string stream)
}
}

/// <summary>
/// The ReliableEntityStatus is used to check the status of the ReliableEntity.
/// </summary>
public enum ReliableEntityStatus
{
Initialization,
Open,
Reconnecting,
Closed,
Initialization,// the entity is initializing
Open, // the entity is open and ready to use
Reconnecting,// the entity is reconnecting but still can be used
Closed,// the entity is closed and cannot be used anymore
}

/// <summary>
Expand All @@ -62,7 +83,7 @@ protected void UpdateStatus(ReliableEntityStatus status)
}
}

protected bool CompareStatus(ReliableEntityStatus toTest)
private bool CompareStatus(ReliableEntityStatus toTest)
{
lock (_lock)
{
Expand Down
Loading