Skip to content

Commit e26dbae

Browse files
committed
Add Producer/Consumer Identifier
The Identifier helps to correlate the entities with the logs and the UI. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent fa8f66b commit e26dbae

19 files changed

+116
-51
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public abstract record EntityCommonConfig
1313
{
1414
internal ConnectionsPool Pool { get; set; }
1515
public Func<MetaDataUpdate, Task> MetadataHandler { get; set; }
16+
17+
public string Identifier { get; set; }
1618
}
1719

1820
internal enum EntityStatus

RabbitMQ.Stream.Client/EntityInfo.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ namespace RabbitMQ.Stream.Client;
66

77
public abstract class Info
88
{
9+
public string Identifier { get; }
910
public string Stream { get; }
1011

11-
protected Info(string stream)
12+
protected Info(string stream, string identifier)
1213
{
1314
Stream = stream;
15+
Identifier = identifier;
1416
}
1517
}

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ public class ConsumerInfo : Info
8080
{
8181
public string Reference { get; }
8282

83-
public ConsumerInfo(string stream, string reference) : base(stream)
83+
public ConsumerInfo(string stream, string reference, string identifier) : base(stream, identifier)
8484
{
8585
Reference = reference;
8686
}
8787

8888
public override string ToString()
8989
{
90-
return $"{base.ToString()}, Reference: {Reference}";
90+
return $"ConsumerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier})";
9191
}
9292
}

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,13 @@ public class ProducerInfo : Info
114114
{
115115
public string Reference { get; }
116116

117-
public ProducerInfo(string stream, string reference) : base(stream)
117+
public ProducerInfo(string stream, string reference, string identifier) : base(stream, identifier)
118118
{
119119
Reference = reference;
120120
}
121+
122+
public override string ToString()
123+
{
124+
return $"ProducerInfo(Stream={Stream}, Reference={Reference}, Identifier={Identifier})";
125+
}
121126
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
77
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
88
override RabbitMQ.Stream.Client.Broker.ToString() -> string
99
override RabbitMQ.Stream.Client.ConsumerInfo.ToString() -> string
10+
override RabbitMQ.Stream.Client.ProducerInfo.ToString() -> string
1011
override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
1112
override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
1213
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing) -> void
@@ -80,7 +81,7 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
8081
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
8182
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
8283
RabbitMQ.Stream.Client.ConsumerInfo
83-
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
84+
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference, string identifier) -> void
8485
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
8586
RabbitMQ.Stream.Client.CrcException
8687
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
@@ -92,6 +93,8 @@ RabbitMQ.Stream.Client.CreateException.ResponseCode.get -> RabbitMQ.Stream.Clien
9293
RabbitMQ.Stream.Client.CreateException.ResponseCode.init -> void
9394
RabbitMQ.Stream.Client.CreateProducerException.CreateProducerException(string s, RabbitMQ.Stream.Client.ResponseCode responseCode) -> void
9495
RabbitMQ.Stream.Client.EntityCommonConfig
96+
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.get -> string
97+
RabbitMQ.Stream.Client.EntityCommonConfig.Identifier.set -> void
9598
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.get -> System.Func<RabbitMQ.Stream.Client.MetaDataUpdate, System.Threading.Tasks.Task>
9699
RabbitMQ.Stream.Client.EntityCommonConfig.MetadataHandler.set -> void
97100
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -133,7 +136,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
133136
RabbitMQ.Stream.Client.ICrc32
134137
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
135138
RabbitMQ.Stream.Client.Info
136-
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
139+
RabbitMQ.Stream.Client.Info.Identifier.get -> string
140+
RabbitMQ.Stream.Client.Info.Info(string stream, string identifier) -> void
137141
RabbitMQ.Stream.Client.Info.Stream.get -> string
138142
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
139143
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
@@ -154,7 +158,7 @@ RabbitMQ.Stream.Client.ProducerFilter
154158
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
155159
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
156160
RabbitMQ.Stream.Client.ProducerInfo
157-
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
161+
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference, string identifier) -> void
158162
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
159163
RabbitMQ.Stream.Client.PublishFilter
160164
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
@@ -208,6 +212,8 @@ RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Cli
208212
RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool
209213
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
210214
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
215+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
216+
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
211217
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
212218
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
213219
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.set -> void

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ protected sealed override string DumpEntityConfiguration()
132132
? "No SuperStream"
133133
: $"SuperStream {_config.SuperStream}";
134134
return
135-
$"Consumer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
135+
$"Consumer id {EntityId} for stream: {_config.Stream}, " +
136+
$"identifier: {_config.Identifier}, " +
137+
$"reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
136138
$"Client ProvidedName {_config.ClientProvidedName}, " +
137139
$"{superStream}, IsSingleActiveConsumer: {_config.IsSingleActiveConsumer}, " +
138140
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
@@ -144,7 +146,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
144146
_initialCredits = config.InitialCredits;
145147
_config = config;
146148
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
147-
Info = new ConsumerInfo(_config.Stream, _config.Reference);
149+
Info = new ConsumerInfo(_config.Stream, _config.Reference, _config.Identifier);
148150
// _chunksBuffer is a channel that is used to buffer the chunks
149151
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
150152
{
@@ -483,6 +485,11 @@ private async Task Init()
483485
consumerProperties["name"] = _config.Reference;
484486
}
485487

488+
if (!string.IsNullOrEmpty(_config.Identifier))
489+
{
490+
consumerProperties["identifier"] = _config.Identifier;
491+
}
492+
486493
if (_config.IsFiltering)
487494
{
488495
var i = 0;

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable
6060
protected sealed override string DumpEntityConfiguration()
6161
{
6262
return
63-
$"Producer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}," +
63+
$"Producer id {EntityId} for stream: {_config.Stream}, " +
64+
$"identifier: {_config.Identifier}" +
65+
$"reference: {_config.Reference}," +
6466
$"Client ProvidedName {_config.ClientProvidedName}, " +
6567
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
6668
}
@@ -87,7 +89,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
8789
{
8890
_client = client;
8991
_config = config;
90-
Info = new ProducerInfo(_config.Stream, _config.Reference);
92+
Info = new ProducerInfo(_config.Stream, _config.Reference, config.Identifier);
9193
Logger = logger ?? NullLogger.Instance;
9294
Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration());
9395
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)

RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private RawSuperStreamConsumer(
5757
_streamInfos = streamInfos;
5858
_clientParameters = clientParameters;
5959
_logger = logger ?? NullLogger.Instance;
60-
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
60+
Info = new ConsumerInfo(_config.SuperStream, _config.Reference, config.Identifier);
6161

6262
StartConsumers().Wait(CancellationToken.None);
6363
}
@@ -74,6 +74,7 @@ private RawConsumerConfig FromStreamConfig(string stream)
7474
ConsumerFilter = _config.ConsumerFilter,
7575
Pool = _config.Pool,
7676
Crc32 = _config.Crc32,
77+
Identifier = _config.Identifier,
7778
ConnectionClosedHandler = async (reason) =>
7879
{
7980
_consumers.TryRemove(stream, out var consumer);
@@ -199,8 +200,8 @@ public void Dispose()
199200
{
200201
foreach (var stream in _consumers.Keys)
201202
{
202-
_consumers.TryRemove(stream, out var consumer);
203-
consumer?.Close();
203+
_consumers.TryGetValue(stream, out var consumer);
204+
consumer?.Dispose();
204205
}
205206

206207
_disposed = true;

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private RawSuperStreamProducer(
6565
_config = config;
6666
_streamInfos = streamInfos;
6767
_clientParameters = clientParameters;
68-
Info = new ProducerInfo(config.SuperStream, config.Reference);
68+
Info = new ProducerInfo(config.SuperStream, config.Reference, config.Identifier);
6969
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
7070
{
7171
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
@@ -92,6 +92,7 @@ private RawProducerConfig FromStreamConfig(string stream)
9292
MaxInFlight = _config.MaxInFlight,
9393
Filter = _config.Filter,
9494
Pool = _config.Pool,
95+
Identifier = _config.Identifier,
9596
ConnectionClosedHandler = async (reason) =>
9697
{
9798
_producers.TryGetValue(stream, out var producer);

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
164164
{
165165
_logger = logger ?? NullLogger<Consumer>.Instance;
166166
_consumerConfig = consumerConfig;
167-
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
167+
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference, consumerConfig.Identifier);
168168
}
169169

170170
public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
@@ -215,7 +215,9 @@ public override async Task Close()
215215

216216
public override string ToString()
217217
{
218-
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream}, " +
218+
return $"Consumer reference: {_consumerConfig.Reference}, " +
219+
$"stream: {_consumerConfig.Stream}, " +
220+
$"identifier: {_consumerConfig.Identifier}, " +
219221
$"client name: {_consumerConfig.ClientProvidedName} ";
220222
}
221223

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
5757
OffsetSpec = offsetSpec,
5858
ConsumerFilter = _consumerConfig.Filter,
5959
Crc32 = _consumerConfig.Crc32,
60+
Identifier = _consumerConfig.Identifier,
6061
ConnectionClosedHandler = async (closeReason) =>
6162
{
6263
if (closeReason == ConnectionClosedReason.Normal)
@@ -120,6 +121,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
120121
ConsumerFilter = _consumerConfig.Filter,
121122
Crc32 = _consumerConfig.Crc32,
122123
OffsetSpec = offsetSpecs,
124+
Identifier = _consumerConfig.Identifier,
123125
ConnectionClosedHandler = async (closeReason, partitionStream) =>
124126
{
125127
await RandomWait().ConfigureAwait(false);

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSys
124124
/// </summary>
125125
public class Producer : ProducerFactory
126126
{
127-
128127
private ulong _publishingId;
129128
private readonly ILogger<Producer> _logger;
130129

@@ -138,7 +137,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
138137
producerConfig.TimeoutMessageAfter,
139138
producerConfig.MaxInFlight
140139
);
141-
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference);
140+
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference, producerConfig.Identifier);
142141
_logger = logger ?? NullLogger<Producer>.Instance;
143142
}
144143

@@ -336,7 +335,9 @@ public async ValueTask Send(List<Message> messages, CompressionType compressionT
336335

337336
public override string ToString()
338337
{
339-
return $"Producer stream: {_producerConfig.Stream}, client name: {_producerConfig.ClientProvidedName}";
338+
return $"Producer stream: {_producerConfig.Stream}, " +
339+
$"identifier: {_producerConfig.Identifier}, " +
340+
$"client name: {_producerConfig.ClientProvidedName}";
340341
}
341342

342343
/// <summary>

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
4545
Routing = _producerConfig.SuperStreamConfig.Routing,
4646
RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType,
4747
Filter = _producerConfig.Filter,
48+
Identifier = _producerConfig.Identifier,
4849
ConnectionClosedHandler = async (closeReason, partitionStream) =>
4950
{
5051
await RandomWait().ConfigureAwait(false);
@@ -99,6 +100,7 @@ private async Task<IProducer> StandardProducer()
99100
Reference = _producerConfig.Reference,
100101
MaxInFlight = _producerConfig.MaxInFlight,
101102
Filter = _producerConfig.Filter,
103+
Identifier = _producerConfig.Identifier,
102104
MetadataHandler = async _ =>
103105
{
104106
await RandomWait().ConfigureAwait(false);

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public record ReliableConfig
1414
public IReconnectStrategy ReconnectStrategy { get; set; }
1515
public IReconnectStrategy ResourceAvailableReconnectStrategy { get; set; }
1616

17+
public string Identifier { get; set; }
18+
1719
public StreamSystem StreamSystem { get; }
1820
public string Stream { get; }
1921

RabbitMQ.Stream.Client/StreamSystem.cs

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -182,25 +182,33 @@ public async Task<ISuperStreamProducer> CreateRawSuperStreamProducer(
182182
throw new CreateProducerException($"producer could not be created code: {partitions.ResponseCode}");
183183
}
184184

185-
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
186-
foreach (var partitionsStream in partitions.Streams)
185+
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
186+
try
187187
{
188-
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
189-
}
188+
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
189+
foreach (var partitionsStream in partitions.Streams)
190+
{
191+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
192+
}
190193

191-
foreach (var (_, value) in streamInfos)
194+
foreach (var (_, value) in streamInfos)
195+
{
196+
ClientExceptions.CheckLeader(value);
197+
}
198+
199+
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
200+
streamInfos,
201+
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
202+
logger);
203+
_logger?.LogDebug("Raw Producer: {ProducerReference} created for SuperStream: {SuperStream}",
204+
rawSuperStreamProducerConfig.Reference,
205+
rawSuperStreamProducerConfig.SuperStream);
206+
return r;
207+
}
208+
finally
192209
{
193-
ClientExceptions.CheckLeader(value);
210+
_semClientProvidedName.Release();
194211
}
195-
196-
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
197-
streamInfos,
198-
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
199-
logger);
200-
_logger?.LogDebug("Raw Producer: {ProducerReference} created for SuperStream: {SuperStream}",
201-
rawSuperStreamProducerConfig.Reference,
202-
rawSuperStreamProducerConfig.SuperStream);
203-
return r;
204212
}
205213

206214
/// <summary>
@@ -239,20 +247,28 @@ public async Task<ISuperStreamConsumer> CreateSuperStreamConsumer(
239247
partitions.ResponseCode);
240248
}
241249

242-
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
243-
foreach (var partitionsStream in partitions.Streams)
250+
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
251+
try
244252
{
245-
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
246-
}
253+
IDictionary<string, StreamInfo> streamInfos = new Dictionary<string, StreamInfo>();
254+
foreach (var partitionsStream in partitions.Streams)
255+
{
256+
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
257+
}
247258

248-
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
249-
streamInfos,
250-
_clientParameters with { ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName },
251-
logger);
252-
_logger?.LogDebug("Consumer: {Reference} created for SuperStream: {SuperStream}",
253-
rawSuperStreamConsumerConfig.Reference, rawSuperStreamConsumerConfig.SuperStream);
259+
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
260+
streamInfos,
261+
_clientParameters with { ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName },
262+
logger);
263+
_logger?.LogDebug("Consumer: {Reference} created for SuperStream: {SuperStream}",
264+
rawSuperStreamConsumerConfig.Reference, rawSuperStreamConsumerConfig.SuperStream);
254265

255-
return s;
266+
return s;
267+
}
268+
finally
269+
{
270+
_semClientProvidedName.Release();
271+
}
256272
}
257273

258274
public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConfig,

0 commit comments

Comments
 (0)