Skip to content

Improve the super stream reconnection #344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,13 @@ public async Task<bool> StreamExists(string stream)
{
var streams = new[] { stream };
var response = await QueryMetadata(streams).ConfigureAwait(false);
if (response.StreamInfos is { Count: >= 1 } &&
response.StreamInfos[stream].ResponseCode == ResponseCode.StreamNotAvailable)
{

ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream);
}

return response.StreamInfos is { Count: >= 1 } &&
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
}
Expand Down
9 changes: 6 additions & 3 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ internal static bool IsAKnownException(Exception exception)
if (exception is AggregateException aggregateException)
{
var x = aggregateException.InnerExceptions.Select(x =>
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException));
x.GetType() == typeof(SocketException) ||
x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException) ||
x.GetType() == typeof(OperationCanceledException) ||
x.GetType() == typeof(InvalidOperationException));
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) ||
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) ||
IsStreamNotAvailable(exception);
}

Expand Down
12 changes: 11 additions & 1 deletion RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
// TODO: we can improve this by getting the connection with the less active items
var connectionItem = Connections.Values.First(x => x.BrokerInfo == brokerInfo && x.Available);
connectionItem.LastUsed = DateTime.UtcNow;

if (connectionItem.Client is not { IsClosed: true })
return connectionItem.Client;

// the connection is closed
// let's remove it from the pool
Connections.TryRemove(connectionItem.Client.ClientId, out _);
// let's create a new one
connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false));
Connections.TryAdd(connectionItem.Client.ClientId, connectionItem);

return connectionItem.Client;
}

Expand All @@ -174,7 +185,6 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
_semaphoreSlim.Release();
}
}

public void Remove(string clientId)
{
_semaphoreSlim.Wait();
Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/IClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,7 @@ public interface IClient

IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers { get; }
IDictionary<byte, (string, ConsumerEvents)> Consumers { get; }

public bool IsClosed { get; }
}
}
7 changes: 5 additions & 2 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

namespace RabbitMQ.Stream.Client;

public interface ISuperStreamConsumer : IConsumer
{
public Task ReconnectPartition(StreamInfo streamInfo);
}

public interface IConsumer : IClosable
{
public Task StoreOffset(ulong offset);
Expand Down Expand Up @@ -42,8 +47,6 @@ public record IConsumerConfig : EntityCommonConfig, INamedEntity

public string Reference { get; set; }

public Func<string, Task> ConnectionClosedHandler { get; set; }

public ConsumerFilter ConsumerFilter { get; set; } = null;

// InitialCredits is the initial credits to be used for the consumer.
Expand Down
5 changes: 4 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

namespace RabbitMQ.Stream.Client;

public interface ISuperStreamProducer : IProducer
{
public Task ReconnectPartition(StreamInfo streamInfo);
}
// <summary>
// Producer interface for sending messages to a stream.
// There are different types of producers:
Expand Down Expand Up @@ -83,7 +87,6 @@ public record ProducerFilter

public record IProducerConfig : EntityCommonConfig, INamedEntity
{

public string Reference { get; set; }
public int MaxInFlight { get; set; } = 1_000;
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";
Expand Down
7 changes: 0 additions & 7 deletions RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.T
RabbitMQ.Stream.Client.IConsumerConfig
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.get -> string
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.get -> System.Func<string, string, bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>>
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.get -> bool
Expand Down Expand Up @@ -636,7 +634,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.init -> void
RabbitMQ.Stream.Client.Reliable.ProducerFactory
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.Reliable.ProducerFactory.ProducerFactory() -> void
RabbitMQ.Stream.Client.Reliable.ProducerFactory._confirmationPipe -> RabbitMQ.Stream.Client.Reliable.ConfirmationPipe
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producerConfig -> RabbitMQ.Stream.Client.Reliable.ProducerConfig
Expand Down Expand Up @@ -723,9 +720,7 @@ RabbitMQ.Stream.Client.StreamSystem
RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.CreateRawConsumer(RabbitMQ.Stream.Client.RawConsumerConfig rawConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
RabbitMQ.Stream.Client.StreamSystem.CreateRawProducer(RabbitMQ.Stream.Client.RawProducerConfig rawProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool
RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
Expand Down Expand Up @@ -776,8 +771,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader<byte> reader, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IProducer
static RabbitMQ.Stream.Client.Reliable.Consumer.Create(RabbitMQ.Stream.Client.Reliable.ConsumerConfig consumerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Consumer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Consumer>
static RabbitMQ.Stream.Client.Reliable.Producer.Create(RabbitMQ.Stream.Client.Reliable.ProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Producer>
static RabbitMQ.Stream.Client.StreamCompressionCodecs.GetCompressionCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec
Expand Down
24 changes: 21 additions & 3 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks
abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string
abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CreateNewEntity(bool boot) -> System.Threading.Tasks.Task
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
override RabbitMQ.Stream.Client.Broker.ToString() -> string
Expand Down Expand Up @@ -97,6 +98,7 @@ RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Me
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
RabbitMQ.Stream.Client.IClient.IsClosed.get -> bool
RabbitMQ.Stream.Client.IClient.Publishers.get -> System.Collections.Generic.IDictionary<byte, (string, (System.Action<System.ReadOnlyMemory<ulong>>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))>
RabbitMQ.Stream.Client.IClosable
RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
Expand Down Expand Up @@ -139,6 +141,10 @@ RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRouting
RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.Broker metaInfoBroker, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.ISuperStreamConsumer
RabbitMQ.Stream.Client.ISuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ISuperStreamProducer
RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.KeyRoutingStrategy
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -159,11 +165,19 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Coll
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, string, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get -> System.Func<string, string, System.Threading.Tasks.Task>
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
Expand All @@ -189,10 +203,9 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.CheckIfStreamIsAvailable(string stream, RabbitMQ.Stream.Client.StreamSystem system) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool
RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnect() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
Expand Down Expand Up @@ -230,6 +243,9 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamProducer>
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
Expand All @@ -245,6 +261,8 @@ static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, Sy
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
Loading