Skip to content

Commit 64817bc

Browse files
authored
Merge 18132c0 into 0711e13
2 parents 0711e13 + 18132c0 commit 64817bc

14 files changed

+139
-3
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,7 @@ protected void MaybeCancelToken()
2121
}
2222

2323
protected Client _client;
24+
25+
public Info Info { get; internal set; }
2426
}
2527
}

RabbitMQ.Stream.Client/EntityInfo.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
// Copyright (c) 2007-2023 VMware, Inc.
4+
5+
namespace RabbitMQ.Stream.Client;
6+
7+
public class Info
8+
{
9+
public string Reference { get; }
10+
public string Stream { get; }
11+
12+
internal Info(string reference, string stream)
13+
{
14+
Reference = reference;
15+
Stream = stream;
16+
}
17+
}

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ public interface IConsumer
1212
public Task StoreOffset(ulong offset);
1313
public Task<ResponseCode> Close();
1414
public void Dispose();
15+
16+
public Info Info { get; }
1517
}
1618

1719
public record IConsumerConfig : INamedEntity

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ public interface IProducer
6767
public int PublishCommandsSent { get; }
6868

6969
public int PendingCount { get; }
70+
71+
/// <summary>
72+
/// Info contains the reference and the stream name.
73+
/// </summary>
74+
public Info Info { get; }
7075
}
7176

7277
public record ProducerFilter

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
22
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
3+
RabbitMQ.Stream.Client.AbstractEntity.Info.get -> RabbitMQ.Stream.Client.Info
34
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
45
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
56
RabbitMQ.Stream.Client.AuthMechanism
@@ -25,6 +26,7 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
2526
RabbitMQ.Stream.Client.CrcException
2627
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
2728
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
29+
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.Info
2830
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
2931
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
3032
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
@@ -54,6 +56,10 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
5456
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
5557
RabbitMQ.Stream.Client.ICrc32
5658
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
59+
RabbitMQ.Stream.Client.Info
60+
RabbitMQ.Stream.Client.Info.Reference.get -> string
61+
RabbitMQ.Stream.Client.Info.Stream.get -> string
62+
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.Info
5763
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
5864
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
5965
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -73,6 +79,8 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
7379
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
7480
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
7581
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
82+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.Info
83+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.Info
7684
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
7785
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
7886
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
@@ -84,13 +92,15 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
8492
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
8593
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
8694
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
95+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.Info
8796
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
8897
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
8998
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
9099
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
91100
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
92101
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
93102
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
103+
RabbitMQ.Stream.Client.Reliable.ReliableBase.Info.get -> RabbitMQ.Stream.Client.Info
94104
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
95105
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
96106
RabbitMQ.Stream.Client.RouteNotFoundException

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
140140
_initialCredits = config.InitialCredits;
141141
_config = config;
142142
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
143-
143+
Info = new Info(_config.Reference, _config.Stream);
144144
// _chunksBuffer is a channel that is used to buffer the chunks
145145
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
146146
{

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
7979
{
8080
_client = client;
8181
_config = config;
82+
Info = new Info(_config.Reference, _config.Stream);
8283
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
8384
{
8485
AllowSynchronousContinuations = false,

RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ private RawSuperStreamConsumer(
5555
_streamInfos = streamInfos;
5656
_clientParameters = clientParameters;
5757
_logger = logger ?? NullLogger.Instance;
58+
Info = new Info(_config.Reference, _config.SuperStream);
5859

5960
StartConsumers().Wait(CancellationToken.None);
6061
}
@@ -217,6 +218,8 @@ public void Dispose()
217218
_disposed = true;
218219
GC.SuppressFinalize(this);
219220
}
221+
222+
public Info Info { get; internal set; }
220223
}
221224

222225
public record RawSuperStreamConsumerConfig : IConsumerConfig

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ public static IProducer Create(
5656
private RawSuperStreamProducer(
5757
RawSuperStreamProducerConfig config,
5858
IDictionary<string, StreamInfo> streamInfos,
59-
ClientParameters clientParameters,
60-
ILogger logger = null
59+
ClientParameters clientParameters, ILogger logger = null
6160
)
6261
{
6362
_config = config;
6463
_streamInfos = streamInfos;
6564
_clientParameters = clientParameters;
65+
Info = new Info(config.Reference, config.SuperStream);
6666
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
6767
{
6868
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
@@ -277,6 +277,7 @@ public void Dispose()
277277
public int IncomingFrames => _producers.Sum(x => x.Value.IncomingFrames);
278278
public int PublishCommandsSent => _producers.Sum(x => x.Value.PublishCommandsSent);
279279
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
280+
public Info Info { get; }
280281
}
281282

282283
public enum RoutingStrategyType

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
165165
{
166166
_logger = logger ?? NullLogger<Consumer>.Instance;
167167
_consumerConfig = consumerConfig;
168+
Info = new Info(consumerConfig.Reference, consumerConfig.Stream);
168169
}
169170

170171
public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)

RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,6 @@ public async Task<ulong> GetLastPublishedId()
8686
{
8787
return await _producer.GetLastPublishingId().ConfigureAwait(false);
8888
}
89+
90+
public Info Info => _producer.Info;
8991
}

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
138138
producerConfig.TimeoutMessageAfter,
139139
producerConfig.MaxInFlight
140140
);
141+
Info = new Info(producerConfig.Reference, producerConfig.Stream);
141142
_logger = logger ?? NullLogger<Producer>.Instance;
142143
}
143144

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,4 +216,6 @@ public bool IsOpen()
216216
{
217217
return _isOpen;
218218
}
219+
220+
public Info Info { get; internal set; }
219221
}

Tests/RawConsumerSystemTests.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,5 +732,94 @@ public async void ShouldConsumeFromDateTimeOffset()
732732
await consumer.Close().ConfigureAwait(false);
733733
await SystemUtils.CleanUpStreamSystem(system, stream);
734734
}
735+
736+
[Fact]
737+
public async void EntityInfoShouldBeCorrect()
738+
{
739+
SystemUtils.ResetSuperStreams();
740+
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
741+
var rawConsumer = await system.CreateRawConsumer(
742+
new RawConsumerConfig(stream) { Reference = "consumer", });
743+
744+
var entityInfo = rawConsumer.Info;
745+
Assert.Equal(stream, entityInfo.Stream);
746+
Assert.Equal("consumer", entityInfo.Reference);
747+
await rawConsumer.Close();
748+
749+
var rawProducer = await system.CreateRawProducer(
750+
new RawProducerConfig(stream) { Reference = "producer", });
751+
752+
entityInfo = rawProducer.Info;
753+
Assert.Equal(stream, entityInfo.Stream);
754+
Assert.Equal("producer", entityInfo.Reference);
755+
await rawProducer.Close();
756+
757+
var rawSuperStreamProducer = await system.CreateRawSuperStreamProducer(
758+
new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange)
759+
{
760+
Reference = "super_producer",
761+
RoutingStrategyType = RoutingStrategyType.Hash,
762+
Routing = _ => "OK",
763+
});
764+
entityInfo = rawSuperStreamProducer.Info;
765+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
766+
Assert.Equal("super_producer", entityInfo.Reference);
767+
await rawSuperStreamProducer.Close();
768+
769+
var rawSuperStreamConsumer = await system.CreateSuperStreamConsumer(
770+
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { Reference = "super_consumer", });
771+
772+
entityInfo = rawSuperStreamConsumer.Info;
773+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
774+
Assert.Equal("super_consumer", entityInfo.Reference);
775+
await rawSuperStreamConsumer.Close();
776+
777+
var producer = await Producer.Create(new ProducerConfig(system, stream));
778+
779+
entityInfo = producer.Info;
780+
Assert.Equal(stream, entityInfo.Stream);
781+
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
782+
await producer.Close();
783+
784+
var consumer = await Consumer.Create(new ConsumerConfig(system, stream) { Reference = "consumer", });
785+
786+
entityInfo = consumer.Info;
787+
Assert.Equal(stream, entityInfo.Stream);
788+
Assert.Equal("consumer", entityInfo.Reference);
789+
await consumer.Close();
790+
791+
var producerSuperStream =
792+
await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
793+
{
794+
SuperStreamConfig = new SuperStreamConfig() { Routing = _ => "OK" }
795+
});
796+
797+
entityInfo = producerSuperStream.Info;
798+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
799+
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
800+
801+
await producerSuperStream.Close();
802+
803+
var consumerSuperStream = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)
804+
{
805+
Reference = "consumer",
806+
IsSuperStream = true,
807+
});
808+
809+
entityInfo = consumerSuperStream.Info;
810+
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
811+
Assert.Equal("consumer", entityInfo.Reference);
812+
await consumerSuperStream.Close();
813+
814+
var dedProducer =
815+
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, "dedProducer"));
816+
817+
entityInfo = dedProducer.Info;
818+
Assert.Equal(stream, entityInfo.Stream);
819+
Assert.Equal("dedProducer", entityInfo.Reference);
820+
await dedProducer.Close();
821+
822+
await SystemUtils.CleanUpStreamSystem(system, stream);
823+
}
735824
}
736825
}

0 commit comments

Comments
 (0)