Skip to content

Commit e37f2a2

Browse files
authored
Add Info class to the Producer/Consumer classes (#322)
The call returns the Reference and Stream. Note that for the producer class the Reference is deprecated and it will be removed at some point. So the info.reference for the producer will be removed as well. We can add other info about the consumer and the consumer in the future. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 0711e13 commit e37f2a2

14 files changed

+139
-3
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ protected void MaybeCancelToken()
2121
}
2222

2323
protected Client _client;
24+
25+
public Info Info { get; internal set; }
2426
}
2527
}

RabbitMQ.Stream.Client/EntityInfo.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client;
6+
7+
public class Info
8+
{
9+
public string Reference { get; }
10+
public string Stream { get; }
11+
12+
internal Info(string reference, string stream)
13+
{
14+
Reference = reference;
15+
Stream = stream;
16+
}
17+
}

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface IConsumer
1212
public Task StoreOffset(ulong offset);
1313
public Task<ResponseCode> Close();
1414
public void Dispose();
15+
16+
public Info Info { get; }
1517
}
1618

1719
public record IConsumerConfig : INamedEntity

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public interface IProducer
6767
public int PublishCommandsSent { get; }
6868

6969
public int PendingCount { get; }
70+
71+
/// <summary>
72+
/// Info contains the reference and the stream name.
73+
/// </summary>
74+
public Info Info { get; }
7075
}
7176

7277
public record ProducerFilter

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
22
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
3+
RabbitMQ.Stream.Client.AbstractEntity.Info.get -> RabbitMQ.Stream.Client.Info
34
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
45
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
56
RabbitMQ.Stream.Client.AuthMechanism
@@ -25,6 +26,7 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
2526
RabbitMQ.Stream.Client.CrcException
2627
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
2728
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
29+
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.Info
2830
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
2931
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
3032
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
@@ -54,6 +56,10 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
5456
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
5557
RabbitMQ.Stream.Client.ICrc32
5658
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
59+
RabbitMQ.Stream.Client.Info
60+
RabbitMQ.Stream.Client.Info.Reference.get -> string
61+
RabbitMQ.Stream.Client.Info.Stream.get -> string
62+
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.Info
5763
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
5864
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
5965
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -73,6 +79,8 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
7379
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
7480
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
7581
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
82+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.Info
83+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.Info
7684
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
7785
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
7886
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
@@ -84,13 +92,15 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
8492
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
8593
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
8694
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
95+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.Info
8796
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
8897
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
8998
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
9099
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
91100
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
92101
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
93102
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
103+
RabbitMQ.Stream.Client.Reliable.ReliableBase.Info.get -> RabbitMQ.Stream.Client.Info
94104
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
95105
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
96106
RabbitMQ.Stream.Client.RouteNotFoundException

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
140140
_initialCredits = config.InitialCredits;
141141
_config = config;
142142
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
143-
143+
Info = new Info(_config.Reference, _config.Stream);
144144
// _chunksBuffer is a channel that is used to buffer the chunks
145145
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
146146
{

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
7979
{
8080
_client = client;
8181
_config = config;
82+
Info = new Info(_config.Reference, _config.Stream);
8283
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
8384
{
8485
AllowSynchronousContinuations = false,

RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private RawSuperStreamConsumer(
5555
_streamInfos = streamInfos;
5656
_clientParameters = clientParameters;
5757
_logger = logger ?? NullLogger.Instance;
58+
Info = new Info(_config.Reference, _config.SuperStream);
5859

5960
StartConsumers().Wait(CancellationToken.None);
6061
}
@@ -217,6 +218,8 @@ public void Dispose()
217218
_disposed = true;
218219
GC.SuppressFinalize(this);
219220
}
221+
222+
public Info Info { get; internal set; }
220223
}
221224

222225
public record RawSuperStreamConsumerConfig : IConsumerConfig

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ public static IProducer Create(
5656
private RawSuperStreamProducer(
5757
RawSuperStreamProducerConfig config,
5858
IDictionary<string, StreamInfo> streamInfos,
59-
ClientParameters clientParameters,
60-
ILogger logger = null
59+
ClientParameters clientParameters, ILogger logger = null
6160
)
6261
{
6362
_config = config;
6463
_streamInfos = streamInfos;
6564
_clientParameters = clientParameters;
65+
Info = new Info(config.Reference, config.SuperStream);
6666
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
6767
{
6868
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
@@ -277,6 +277,7 @@ public void Dispose()
277277
public int IncomingFrames => _producers.Sum(x => x.Value.IncomingFrames);
278278
public int PublishCommandsSent => _producers.Sum(x => x.Value.PublishCommandsSent);
279279
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
280+
public Info Info { get; }
280281
}
281282

282283
public enum RoutingStrategyType

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
165165
{
166166
_logger = logger ?? NullLogger<Consumer>.Instance;
167167
_consumerConfig = consumerConfig;
168+
Info = new Info(consumerConfig.Reference, consumerConfig.Stream);
168169
}
169170

170171
public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)

RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,6 @@ public async Task<ulong> GetLastPublishedId()
8686
{
8787
return await _producer.GetLastPublishingId().ConfigureAwait(false);
8888
}
89+
90+
public Info Info => _producer.Info;
8991
}

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
138138
producerConfig.TimeoutMessageAfter,
139139
producerConfig.MaxInFlight
140140
);
141+
Info = new Info(producerConfig.Reference, producerConfig.Stream);
141142
_logger = logger ?? NullLogger<Producer>.Instance;
142143
}
143144

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,6 @@ public bool IsOpen()
216216
{
217217
return _isOpen;
218218
}
219+
220+
public Info Info { get; internal set; }
219221
}

Tests/RawConsumerSystemTests.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,5 +732,94 @@ public async void ShouldConsumeFromDateTimeOffset()
732732
await consumer.Close().ConfigureAwait(false);
733733
await SystemUtils.CleanUpStreamSystem(system, stream);
734734
}
735+
736+
[Fact]
737+
public async void EntityInfoShouldBeCorrect()
738+
{
739+
SystemUtils.ResetSuperStreams();
740+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
741+
var rawConsumer = await system.CreateRawConsumer(
742+
new RawConsumerConfig(stream) { Reference = "consumer", });
743+
744+
var entityInfo = rawConsumer.Info;
745+
Assert.Equal(stream, entityInfo.Stream);
746+
Assert.Equal("consumer", entityInfo.Reference);
747+
await rawConsumer.Close();
748+
749+
var rawProducer = await system.CreateRawProducer(
750+
new RawProducerConfig(stream) { Reference = "producer", });
751+
752+
entityInfo = rawProducer.Info;
753+
Assert.Equal(stream, entityInfo.Stream);
754+
Assert.Equal("producer", entityInfo.Reference);
755+
await rawProducer.Close();
756+
757+
var rawSuperStreamProducer = await system.CreateRawSuperStreamProducer(
758+
new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange)
759+
{
760+
Reference = "super_producer",
761+
RoutingStrategyType = RoutingStrategyType.Hash,
762+
Routing = _ => "OK",
763+
});
764+
entityInfo = rawSuperStreamProducer.Info;
765+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
766+
Assert.Equal("super_producer", entityInfo.Reference);
767+
await rawSuperStreamProducer.Close();
768+
769+
var rawSuperStreamConsumer = await system.CreateSuperStreamConsumer(
770+
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { Reference = "super_consumer", });
771+
772+
entityInfo = rawSuperStreamConsumer.Info;
773+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
774+
Assert.Equal("super_consumer", entityInfo.Reference);
775+
await rawSuperStreamConsumer.Close();
776+
777+
var producer = await Producer.Create(new ProducerConfig(system, stream));
778+
779+
entityInfo = producer.Info;
780+
Assert.Equal(stream, entityInfo.Stream);
781+
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
782+
await producer.Close();
783+
784+
var consumer = await Consumer.Create(new ConsumerConfig(system, stream) { Reference = "consumer", });
785+
786+
entityInfo = consumer.Info;
787+
Assert.Equal(stream, entityInfo.Stream);
788+
Assert.Equal("consumer", entityInfo.Reference);
789+
await consumer.Close();
790+
791+
var producerSuperStream =
792+
await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
793+
{
794+
SuperStreamConfig = new SuperStreamConfig() { Routing = _ => "OK" }
795+
});
796+
797+
entityInfo = producerSuperStream.Info;
798+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
799+
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
800+
801+
await producerSuperStream.Close();
802+
803+
var consumerSuperStream = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)
804+
{
805+
Reference = "consumer",
806+
IsSuperStream = true,
807+
});
808+
809+
entityInfo = consumerSuperStream.Info;
810+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
811+
Assert.Equal("consumer", entityInfo.Reference);
812+
await consumerSuperStream.Close();
813+
814+
var dedProducer =
815+
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, "dedProducer"));
816+
817+
entityInfo = dedProducer.Info;
818+
Assert.Equal(stream, entityInfo.Stream);
819+
Assert.Equal("dedProducer", entityInfo.Reference);
820+
await dedProducer.Close();
821+
822+
await SystemUtils.CleanUpStreamSystem(system, stream);
823+
}
735824
}
736825
}

0 commit comments

Comments
 (0)