Skip to content

Commit a515ea9

Browse files
authored
Make the info class abstract (#323)
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent e37f2a2 commit a515ea9

14 files changed

+77
-53
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,5 @@ protected void MaybeCancelToken()
2222

2323
protected Client _client;
2424

25-
public Info Info { get; internal set; }
2625
}
2726
}

RabbitMQ.Stream.Client/EntityInfo.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44

55
namespace RabbitMQ.Stream.Client;
66

7-
public class Info
7+
public abstract class Info
88
{
9-
public string Reference { get; }
109
public string Stream { get; }
1110

12-
internal Info(string reference, string stream)
11+
protected Info(string stream)
1312
{
14-
Reference = reference;
1513
Stream = stream;
1614
}
1715
}

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public interface IConsumer
1313
public Task<ResponseCode> Close();
1414
public void Dispose();
1515

16-
public Info Info { get; }
16+
public ConsumerInfo Info { get; }
1717
}
1818

1919
public record IConsumerConfig : INamedEntity
@@ -73,3 +73,12 @@ public ushort InitialCredits
7373
// It is not enabled by default because it is could reduce the performance.
7474
public ICrc32 Crc32 { get; set; } = null;
7575
}
76+
77+
public class ConsumerInfo : Info
78+
{
79+
public string Reference { get; }
80+
public ConsumerInfo(string stream, string reference) : base(stream)
81+
{
82+
Reference = reference;
83+
}
84+
}

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public interface IProducer
7171
/// <summary>
7272
/// Info contains the reference and the stream name.
7373
/// </summary>
74-
public Info Info { get; }
74+
public ProducerInfo Info { get; }
7575
}
7676

7777
public record ProducerFilter
@@ -104,3 +104,16 @@ public record IProducerConfig : INamedEntity
104104
/// </summary>
105105
public ProducerFilter Filter { get; set; } = null;
106106
}
107+
108+
/// <summary>
109+
/// ProducerInfo contains the reference and the stream name.
110+
/// </summary>
111+
public class ProducerInfo : Info
112+
{
113+
public string Reference { get; }
114+
115+
public ProducerInfo(string stream, string reference) : base(stream)
116+
{
117+
Reference = reference;
118+
}
119+
}

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
22
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
3-
RabbitMQ.Stream.Client.AbstractEntity.Info.get -> RabbitMQ.Stream.Client.Info
43
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
54
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
65
RabbitMQ.Stream.Client.AuthMechanism
@@ -23,10 +22,13 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
2322
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
2423
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
2524
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
25+
RabbitMQ.Stream.Client.ConsumerInfo
26+
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
27+
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
2628
RabbitMQ.Stream.Client.CrcException
2729
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
2830
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
29-
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.Info
31+
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
3032
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
3133
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
3234
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
@@ -57,9 +59,9 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
5759
RabbitMQ.Stream.Client.ICrc32
5860
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
5961
RabbitMQ.Stream.Client.Info
60-
RabbitMQ.Stream.Client.Info.Reference.get -> string
62+
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
6163
RabbitMQ.Stream.Client.Info.Stream.get -> string
62-
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.Info
64+
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
6365
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
6466
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
6567
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -71,6 +73,9 @@ RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOf
7173
RabbitMQ.Stream.Client.ProducerFilter
7274
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
7375
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
76+
RabbitMQ.Stream.Client.ProducerInfo
77+
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
78+
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
7479
RabbitMQ.Stream.Client.PublishFilter
7580
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
7681
RabbitMQ.Stream.Client.PublishFilter.MaxVersion.get -> ushort
@@ -79,10 +84,13 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
7984
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
8085
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
8186
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
82-
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.Info
83-
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.Info
87+
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
88+
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
89+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
90+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
8491
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
8592
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
93+
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
8694
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
8795
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
8896
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
@@ -92,15 +100,15 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
92100
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
93101
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
94102
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
95-
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.Info
103+
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
96104
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
97105
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
98106
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
99107
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
108+
RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
100109
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
101110
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
102111
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
103-
RabbitMQ.Stream.Client.Reliable.ReliableBase.Info.get -> RabbitMQ.Stream.Client.Info
104112
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
105113
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
106114
RabbitMQ.Stream.Client.RouteNotFoundException

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
140140
_initialCredits = config.InitialCredits;
141141
_config = config;
142142
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
143-
Info = new Info(_config.Reference, _config.Stream);
143+
Info = new ConsumerInfo(_config.Stream, _config.Reference);
144144
// _chunksBuffer is a channel that is used to buffer the chunks
145145
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
146146
{
@@ -643,5 +643,7 @@ public void Dispose()
643643
GC.SuppressFinalize(this);
644644
}
645645
}
646+
647+
public ConsumerInfo Info { get; }
646648
}
647649
}

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
7979
{
8080
_client = client;
8181
_config = config;
82-
Info = new Info(_config.Reference, _config.Stream);
82+
Info = new ProducerInfo(_config.Stream, _config.Reference);
8383
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
8484
{
8585
AllowSynchronousContinuations = false,
@@ -403,5 +403,7 @@ public void Dispose()
403403

404404
GC.SuppressFinalize(this);
405405
}
406+
407+
public ProducerInfo Info { get; }
406408
}
407409
}

RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private RawSuperStreamConsumer(
5555
_streamInfos = streamInfos;
5656
_clientParameters = clientParameters;
5757
_logger = logger ?? NullLogger.Instance;
58-
Info = new Info(_config.Reference, _config.SuperStream);
58+
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);
5959

6060
StartConsumers().Wait(CancellationToken.None);
6161
}
@@ -219,7 +219,7 @@ public void Dispose()
219219
GC.SuppressFinalize(this);
220220
}
221221

222-
public Info Info { get; internal set; }
222+
public ConsumerInfo Info { get; }
223223
}
224224

225225
public record RawSuperStreamConsumerConfig : IConsumerConfig

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private RawSuperStreamProducer(
6262
_config = config;
6363
_streamInfos = streamInfos;
6464
_clientParameters = clientParameters;
65-
Info = new Info(config.Reference, config.SuperStream);
65+
Info = new ProducerInfo(config.SuperStream, config.Reference);
6666
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
6767
{
6868
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
@@ -277,7 +277,7 @@ public void Dispose()
277277
public int IncomingFrames => _producers.Sum(x => x.Value.IncomingFrames);
278278
public int PublishCommandsSent => _producers.Sum(x => x.Value.PublishCommandsSent);
279279
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
280-
public Info Info { get; }
280+
public ProducerInfo Info { get; }
281281
}
282282

283283
public enum RoutingStrategyType

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
165165
{
166166
_logger = logger ?? NullLogger<Consumer>.Instance;
167167
_consumerConfig = consumerConfig;
168-
Info = new Info(consumerConfig.Reference, consumerConfig.Stream);
168+
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
169169
}
170170

171171
public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
@@ -213,4 +213,6 @@ public override string ToString()
213213
{
214214
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream} ";
215215
}
216+
217+
public ConsumerInfo Info { get; }
216218
}

RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,5 @@ public async Task<ulong> GetLastPublishedId()
8787
return await _producer.GetLastPublishingId().ConfigureAwait(false);
8888
}
8989

90-
public Info Info => _producer.Info;
90+
public ProducerInfo Info => _producer.Info;
9191
}

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
138138
producerConfig.TimeoutMessageAfter,
139139
producerConfig.MaxInFlight
140140
);
141-
Info = new Info(producerConfig.Reference, producerConfig.Stream);
141+
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference);
142142
_logger = logger ?? NullLogger<Producer>.Instance;
143143
}
144144

@@ -374,4 +374,6 @@ public async ValueTask Send(List<Message> messages)
374374
SemaphoreSlim.Release();
375375
}
376376
}
377+
378+
public ProducerInfo Info { get; }
377379
}

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,4 @@ public bool IsOpen()
216216
{
217217
return _isOpen;
218218
}
219-
220-
public Info Info { get; internal set; }
221219
}

Tests/RawConsumerSystemTests.cs

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -741,17 +741,15 @@ public async void EntityInfoShouldBeCorrect()
741741
var rawConsumer = await system.CreateRawConsumer(
742742
new RawConsumerConfig(stream) { Reference = "consumer", });
743743

744-
var entityInfo = rawConsumer.Info;
745-
Assert.Equal(stream, entityInfo.Stream);
746-
Assert.Equal("consumer", entityInfo.Reference);
744+
Assert.Equal(stream, rawConsumer.Info.Stream);
745+
Assert.Equal("consumer", rawConsumer.Info.Reference);
747746
await rawConsumer.Close();
748747

749748
var rawProducer = await system.CreateRawProducer(
750749
new RawProducerConfig(stream) { Reference = "producer", });
751750

752-
entityInfo = rawProducer.Info;
753-
Assert.Equal(stream, entityInfo.Stream);
754-
Assert.Equal("producer", entityInfo.Reference);
751+
Assert.Equal(stream, rawProducer.Info.Stream);
752+
Assert.Equal("producer", rawProducer.Info.Reference);
755753
await rawProducer.Close();
756754

757755
var rawSuperStreamProducer = await system.CreateRawSuperStreamProducer(
@@ -761,31 +759,27 @@ public async void EntityInfoShouldBeCorrect()
761759
RoutingStrategyType = RoutingStrategyType.Hash,
762760
Routing = _ => "OK",
763761
});
764-
entityInfo = rawSuperStreamProducer.Info;
765-
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
766-
Assert.Equal("super_producer", entityInfo.Reference);
762+
Assert.Equal(SystemUtils.InvoicesExchange, rawSuperStreamProducer.Info.Stream);
763+
Assert.Equal("super_producer", rawSuperStreamProducer.Info.Reference);
767764
await rawSuperStreamProducer.Close();
768765

769766
var rawSuperStreamConsumer = await system.CreateSuperStreamConsumer(
770767
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { Reference = "super_consumer", });
771768

772-
entityInfo = rawSuperStreamConsumer.Info;
773-
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
774-
Assert.Equal("super_consumer", entityInfo.Reference);
769+
Assert.Equal(SystemUtils.InvoicesExchange, rawSuperStreamConsumer.Info.Stream);
770+
Assert.Equal("super_consumer", rawSuperStreamConsumer.Info.Reference);
775771
await rawSuperStreamConsumer.Close();
776772

777773
var producer = await Producer.Create(new ProducerConfig(system, stream));
778774

779-
entityInfo = producer.Info;
780-
Assert.Equal(stream, entityInfo.Stream);
781-
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
775+
Assert.Equal(stream, producer.Info.Stream);
776+
Assert.True(string.IsNullOrWhiteSpace(producer.Info.Reference));
782777
await producer.Close();
783778

784779
var consumer = await Consumer.Create(new ConsumerConfig(system, stream) { Reference = "consumer", });
785780

786-
entityInfo = consumer.Info;
787-
Assert.Equal(stream, entityInfo.Stream);
788-
Assert.Equal("consumer", entityInfo.Reference);
781+
Assert.Equal(stream, consumer.Info.Stream);
782+
Assert.Equal("consumer", consumer.Info.Reference);
789783
await consumer.Close();
790784

791785
var producerSuperStream =
@@ -794,9 +788,8 @@ await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
794788
SuperStreamConfig = new SuperStreamConfig() { Routing = _ => "OK" }
795789
});
796790

797-
entityInfo = producerSuperStream.Info;
798-
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
799-
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
791+
Assert.Equal(SystemUtils.InvoicesExchange, producerSuperStream.Info.Stream);
792+
Assert.True(string.IsNullOrWhiteSpace(producerSuperStream.Info.Reference));
800793

801794
await producerSuperStream.Close();
802795

@@ -806,17 +799,15 @@ await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
806799
IsSuperStream = true,
807800
});
808801

809-
entityInfo = consumerSuperStream.Info;
810-
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
811-
Assert.Equal("consumer", entityInfo.Reference);
802+
Assert.Equal(SystemUtils.InvoicesExchange, consumerSuperStream.Info.Stream);
803+
Assert.Equal("consumer", consumerSuperStream.Info.Reference);
812804
await consumerSuperStream.Close();
813805

814806
var dedProducer =
815807
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, "dedProducer"));
816808

817-
entityInfo = dedProducer.Info;
818-
Assert.Equal(stream, entityInfo.Stream);
819-
Assert.Equal("dedProducer", entityInfo.Reference);
809+
Assert.Equal(stream, dedProducer.Info.Stream);
810+
Assert.Equal("dedProducer", dedProducer.Info.Reference);
820811
await dedProducer.Close();
821812

822813
await SystemUtils.CleanUpStreamSystem(system, stream);

0 commit comments

Comments
 (0)