Skip to content

Commit 083ff20

Browse files
authored
Improve the super stream reconnection (#344)
Fixes: #333 Fixes: #335 # Preface The commit is focused on making the client more stable during the server restart and better handling the metadata update event. Most of the changes are in the super stream producer and consumer. See the issue #333 # 1- What's changed in the super stream part The RawSuperStreamProducer and RawSuperStreamConsumer expose two callbacks: - `ConnectionClosedHandler (reason, stream)` - `MetadataHandler(update)` - `ISuperStreamProducer` and `ISuperStreamConsumer` interfaces ConnectionClosedHandler -- The callback is raised each time a partition is closed for some reason. You can check the reason with: ``` 1- public const string Normal = "TCP connection closed normal"; 2- public const string Unexpected = "TCP connection closed unexpected"; ``` 1- The super stream partition is closed by the `Close()` method, so there are no problems. It can be helpful in terms of alerting or logs. 2- The super stream partition is closed in an unexpected way like kill the connection, network problems broker restart etc.. In that case, you can reconnect the partition with the `ReconnectPartition` method. MetadataHandler --- It happens when there is a change in the stream topology. In this case, the server drops all the producers and consumers. So, you need to react to the Metadata Handler event. You can decide to reconnect the producer and consumer using the `ReconnectPartition` Producer and Consumer classes -- The `Producer` and `Consumer` classes handle automatically the `ConnectionClosedHandler` and the `MetadataHandler` events for stream and super stream. # 2- Fail fast The Producer and Consumer classes can restore the connection automatically when they are up and running. The classes will fail immediately if there are problems during the first connection. This behaviour is now like the Java Client's. --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7fa33dd commit 083ff20

27 files changed

+611
-313
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,13 @@ public async Task<bool> StreamExists(string stream)
843843
{
844844
var streams = new[] { stream };
845845
var response = await QueryMetadata(streams).ConfigureAwait(false);
846+
if (response.StreamInfos is { Count: >= 1 } &&
847+
response.StreamInfos[stream].ResponseCode == ResponseCode.StreamNotAvailable)
848+
{
849+
850+
ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream);
851+
}
852+
846853
return response.StreamInfos is { Count: >= 1 } &&
847854
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
848855
}

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@ internal static bool IsAKnownException(Exception exception)
3030
if (exception is AggregateException aggregateException)
3131
{
3232
var x = aggregateException.InnerExceptions.Select(x =>
33-
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
34-
x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException));
33+
x.GetType() == typeof(SocketException) ||
34+
x.GetType() == typeof(TimeoutException) ||
35+
x.GetType() == typeof(LeaderNotFoundException) ||
36+
x.GetType() == typeof(OperationCanceledException) ||
37+
x.GetType() == typeof(InvalidOperationException));
3538
return x.Any();
3639
}
3740

38-
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) ||
41+
return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) ||
3942
IsStreamNotAvailable(exception);
4043
}
4144

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,17 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
154154
// TODO: we can improve this by getting the connection with the less active items
155155
var connectionItem = Connections.Values.First(x => x.BrokerInfo == brokerInfo && x.Available);
156156
connectionItem.LastUsed = DateTime.UtcNow;
157+
158+
if (connectionItem.Client is not { IsClosed: true })
159+
return connectionItem.Client;
160+
161+
// the connection is closed
162+
// let's remove it from the pool
163+
Connections.TryRemove(connectionItem.Client.ClientId, out _);
164+
// let's create a new one
165+
connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false));
166+
Connections.TryAdd(connectionItem.Client.ClientId, connectionItem);
167+
157168
return connectionItem.Client;
158169
}
159170

@@ -174,7 +185,6 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
174185
_semaphoreSlim.Release();
175186
}
176187
}
177-
178188
public void Remove(string clientId)
179189
{
180190
_semaphoreSlim.Wait();

RabbitMQ.Stream.Client/IClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,7 @@ public interface IClient
2727

2828
IDictionary<byte, (string, (Action<ReadOnlyMemory<ulong>>, Action<(ulong, ResponseCode)[]>))> Publishers { get; }
2929
IDictionary<byte, (string, ConsumerEvents)> Consumers { get; }
30+
31+
public bool IsClosed { get; }
3032
}
3133
}

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77

88
namespace RabbitMQ.Stream.Client;
99

10+
public interface ISuperStreamConsumer : IConsumer
11+
{
12+
public Task ReconnectPartition(StreamInfo streamInfo);
13+
}
14+
1015
public interface IConsumer : IClosable
1116
{
1217
public Task StoreOffset(ulong offset);
@@ -42,8 +47,6 @@ public record IConsumerConfig : EntityCommonConfig, INamedEntity
4247

4348
public string Reference { get; set; }
4449

45-
public Func<string, Task> ConnectionClosedHandler { get; set; }
46-
4750
public ConsumerFilter ConsumerFilter { get; set; } = null;
4851

4952
// InitialCredits is the initial credits to be used for the consumer.

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88

99
namespace RabbitMQ.Stream.Client;
1010

11+
public interface ISuperStreamProducer : IProducer
12+
{
13+
public Task ReconnectPartition(StreamInfo streamInfo);
14+
}
1115
// <summary>
1216
// Producer interface for sending messages to a stream.
1317
// There are different types of producers:
@@ -83,7 +87,6 @@ public record ProducerFilter
8387

8488
public record IProducerConfig : EntityCommonConfig, INamedEntity
8589
{
86-
8790
public string Reference { get; set; }
8891
public int MaxInFlight { get; set; } = 1_000;
8992
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,6 @@ RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.T
335335
RabbitMQ.Stream.Client.IConsumerConfig
336336
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.get -> string
337337
RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.set -> void
338-
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
339-
RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.set -> void
340338
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.get -> System.Func<string, string, bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>>
341339
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.set -> void
342340
RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.get -> bool
@@ -636,7 +634,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void
636634
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
637635
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.init -> void
638636
RabbitMQ.Stream.Client.Reliable.ProducerFactory
639-
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
640637
RabbitMQ.Stream.Client.Reliable.ProducerFactory.ProducerFactory() -> void
641638
RabbitMQ.Stream.Client.Reliable.ProducerFactory._confirmationPipe -> RabbitMQ.Stream.Client.Reliable.ConfirmationPipe
642639
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producerConfig -> RabbitMQ.Stream.Client.Reliable.ProducerConfig
@@ -723,9 +720,7 @@ RabbitMQ.Stream.Client.StreamSystem
723720
RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task
724721
RabbitMQ.Stream.Client.StreamSystem.CreateRawConsumer(RabbitMQ.Stream.Client.RawConsumerConfig rawConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
725722
RabbitMQ.Stream.Client.StreamSystem.CreateRawProducer(RabbitMQ.Stream.Client.RawProducerConfig rawProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
726-
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
727723
RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task
728-
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
729724
RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task
730725
RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool
731726
RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task<ulong>
@@ -776,8 +771,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C
776771
static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator
777772
static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator
778773
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader<byte> reader, uint len) -> RabbitMQ.Stream.Client.Message
779-
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer
780-
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IProducer
781774
static RabbitMQ.Stream.Client.Reliable.Consumer.Create(RabbitMQ.Stream.Client.Reliable.ConsumerConfig consumerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Consumer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Consumer>
782775
static RabbitMQ.Stream.Client.Reliable.Producer.Create(RabbitMQ.Stream.Client.Reliable.ProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.Producer>
783776
static RabbitMQ.Stream.Client.StreamCompressionCodecs.GetCompressionCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks
22
abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
33
abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string
44
abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string
5+
abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CreateNewEntity(bool boot) -> System.Threading.Tasks.Task
56
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
67
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
78
override RabbitMQ.Stream.Client.Broker.ToString() -> string
@@ -97,6 +98,7 @@ RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Me
9798
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
9899
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
99100
RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
101+
RabbitMQ.Stream.Client.IClient.IsClosed.get -> bool
100102
RabbitMQ.Stream.Client.IClient.Publishers.get -> System.Collections.Generic.IDictionary<byte, (string, (System.Action<System.ReadOnlyMemory<ulong>>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))>
101103
RabbitMQ.Stream.Client.IClosable
102104
RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
@@ -139,6 +141,10 @@ RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
139141
RabbitMQ.Stream.Client.IRouting
140142
RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.Broker metaInfoBroker, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
141143
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
144+
RabbitMQ.Stream.Client.ISuperStreamConsumer
145+
RabbitMQ.Stream.Client.ISuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
146+
RabbitMQ.Stream.Client.ISuperStreamProducer
147+
RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
142148
RabbitMQ.Stream.Client.KeyRoutingStrategy
143149
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
144150
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
@@ -159,11 +165,19 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Coll
159165
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
160166
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
161167
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
168+
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, System.Threading.Tasks.Task>
169+
RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void
162170
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
163171
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
164172
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
173+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
174+
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get -> System.Func<string, string, System.Threading.Tasks.Task>
175+
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void
165176
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
166177
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
178+
RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task
179+
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get -> System.Func<string, string, System.Threading.Tasks.Task>
180+
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
167181
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
168182
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
169183
RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
@@ -189,10 +203,9 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod
189203
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
190204
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
191205
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
192-
RabbitMQ.Stream.Client.Reliable.ReliableBase.CheckIfStreamIsAvailable(string stream, RabbitMQ.Stream.Client.StreamSystem system) -> System.Threading.Tasks.Task<bool>
206+
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
207+
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
193208
RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool
194-
RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool
195-
RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnect() -> System.Threading.Tasks.Task
196209
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
197210
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
198211
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
@@ -230,6 +243,9 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G
230243
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void
231244
RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary<string, long> statistic) -> void
232245
RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span<byte> span) -> int
246+
RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamProducer>
247+
RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ISuperStreamConsumer>
248+
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
233249
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
234250
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
235251
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
@@ -245,6 +261,8 @@ static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, Sy
245261
static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence<byte> seq, uint len) -> RabbitMQ.Stream.Client.Message
246262
static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IConsumer>
247263
static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
264+
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
265+
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
248266
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
249267
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
250268
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>

0 commit comments

Comments
 (0)