Skip to content

Add Info class to the Producer/Consumer classes #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ protected void MaybeCancelToken()
}

protected Client _client;

public Info Info { get; internal set; }
}
}
17 changes: 17 additions & 0 deletions RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

namespace RabbitMQ.Stream.Client;

public class Info
{
public string Reference { get; }
public string Stream { get; }

internal Info(string reference, string stream)
{
Reference = reference;
Stream = stream;
}
}
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface IConsumer
public Task StoreOffset(ulong offset);
public Task<ResponseCode> Close();
public void Dispose();

public Info Info { get; }
}

public record IConsumerConfig : INamedEntity
Expand Down
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public interface IProducer
public int PublishCommandsSent { get; }

public int PendingCount { get; }

/// <summary>
/// Info contains the reference and the stream name.
/// </summary>
public Info Info { get; }
}

public record ProducerFilter
Expand Down
10 changes: 10 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
RabbitMQ.Stream.Client.AbstractEntity.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.AuthMechanism
Expand All @@ -25,6 +26,7 @@ RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
Expand Down Expand Up @@ -54,6 +56,10 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Reference.get -> string
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -73,6 +79,8 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
Expand All @@ -84,13 +92,15 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());

Info = new Info(_config.Reference, _config.Stream);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
{
_client = client;
_config = config;
Info = new Info(_config.Reference, _config.Stream);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private RawSuperStreamConsumer(
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new Info(_config.Reference, _config.SuperStream);

StartConsumers().Wait(CancellationToken.None);
}
Expand Down Expand Up @@ -217,6 +218,8 @@ public void Dispose()
_disposed = true;
GC.SuppressFinalize(this);
}

public Info Info { get; internal set; }
}

public record RawSuperStreamConsumerConfig : IConsumerConfig
Expand Down
5 changes: 3 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public static IProducer Create(
private RawSuperStreamProducer(
RawSuperStreamProducerConfig config,
IDictionary<string, StreamInfo> streamInfos,
ClientParameters clientParameters,
ILogger logger = null
ClientParameters clientParameters, ILogger logger = null
)
{
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new Info(config.Reference, config.SuperStream);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand Down Expand Up @@ -277,6 +277,7 @@ public void Dispose()
public int IncomingFrames => _producers.Sum(x => x.Value.IncomingFrames);
public int PublishCommandsSent => _producers.Sum(x => x.Value.PublishCommandsSent);
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
public Info Info { get; }
}

public enum RoutingStrategyType
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new Info(consumerConfig.Reference, consumerConfig.Stream);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ public async Task<ulong> GetLastPublishedId()
{
return await _producer.GetLastPublishingId().ConfigureAwait(false);
}

public Info Info => _producer.Info;
}
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
producerConfig.TimeoutMessageAfter,
producerConfig.MaxInFlight
);
Info = new Info(producerConfig.Reference, producerConfig.Stream);
_logger = logger ?? NullLogger<Producer>.Instance;
}

Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,6 @@ public bool IsOpen()
{
return _isOpen;
}

public Info Info { get; internal set; }
}
89 changes: 89 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -732,5 +732,94 @@ public async void ShouldConsumeFromDateTimeOffset()
await consumer.Close().ConfigureAwait(false);
await SystemUtils.CleanUpStreamSystem(system, stream);
}

[Fact]
public async void EntityInfoShouldBeCorrect()
{
SystemUtils.ResetSuperStreams();
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream) { Reference = "consumer", });

var entityInfo = rawConsumer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
await rawConsumer.Close();

var rawProducer = await system.CreateRawProducer(
new RawProducerConfig(stream) { Reference = "producer", });

entityInfo = rawProducer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("producer", entityInfo.Reference);
await rawProducer.Close();

var rawSuperStreamProducer = await system.CreateRawSuperStreamProducer(
new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange)
{
Reference = "super_producer",
RoutingStrategyType = RoutingStrategyType.Hash,
Routing = _ => "OK",
});
entityInfo = rawSuperStreamProducer.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("super_producer", entityInfo.Reference);
await rawSuperStreamProducer.Close();

var rawSuperStreamConsumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { Reference = "super_consumer", });

entityInfo = rawSuperStreamConsumer.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("super_consumer", entityInfo.Reference);
await rawSuperStreamConsumer.Close();

var producer = await Producer.Create(new ProducerConfig(system, stream));

entityInfo = producer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
await producer.Close();

var consumer = await Consumer.Create(new ConsumerConfig(system, stream) { Reference = "consumer", });

entityInfo = consumer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
await consumer.Close();

var producerSuperStream =
await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
{
SuperStreamConfig = new SuperStreamConfig() { Routing = _ => "OK" }
});

entityInfo = producerSuperStream.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));

await producerSuperStream.Close();

var consumerSuperStream = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)
{
Reference = "consumer",
IsSuperStream = true,
});

entityInfo = consumerSuperStream.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
await consumerSuperStream.Close();

var dedProducer =
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, "dedProducer"));

entityInfo = dedProducer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("dedProducer", entityInfo.Reference);
await dedProducer.Close();

await SystemUtils.CleanUpStreamSystem(system, stream);
}
}
}