Skip to content

Commit 80d759d

Browse files
committed
work in progress
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4cefb84 commit 80d759d

File tree

6 files changed

+93
-71
lines changed

6 files changed

+93
-71
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ public record ClientParameters
5454
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
5555

5656
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
57+
58+
59+
public delegate void MetadataUpdateHandler(MetaDataUpdate update);
60+
61+
public event MetadataUpdateHandler OnMetadataUpdate;
5762
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
5863
public TimeSpan Heartbeat { get; set; } = TimeSpan.FromMinutes(1);
5964

@@ -71,6 +76,11 @@ public string ClientProvidedName
7176
public AddressResolver AddressResolver { get; set; } = null;
7277

7378
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
79+
80+
internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
81+
{
82+
OnMetadataUpdate?.Invoke(metaDataUpdate);
83+
}
7484
}
7585

7686
internal readonly struct OutgoingMsg : ICommand
@@ -213,7 +223,8 @@ await client
213223
.ConfigureAwait(false);
214224
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);
215225

216-
var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpperInvariant(),
226+
var isValid = saslHandshakeResponse.Mechanisms.Contains(
227+
parameters.AuthMechanism.ToString().ToUpperInvariant(),
217228
StringComparer.OrdinalIgnoreCase);
218229
if (!isValid)
219230
{
@@ -225,7 +236,8 @@ await client
225236
var authResponse =
226237
await client
227238
.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
228-
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), saslData))
239+
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(),
240+
saslData))
229241
.ConfigureAwait(false);
230242
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);
231243

@@ -345,7 +357,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
345357
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
346358
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
347359
{
348-
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
360+
return await Subscribe(new RawConsumerConfig(stream) {OffsetSpec = offsetType},
349361
initialCredit,
350362
properties,
351363
deliverHandler,
@@ -512,7 +524,8 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
512524
break;
513525
case MetaDataUpdate.Key:
514526
MetaDataUpdate.Read(frame, out var metaDataUpdate);
515-
Parameters.MetadataHandler(metaDataUpdate);
527+
// Parameters.MetadataHandler(metaDataUpdate);
528+
Parameters.FireMetadataUpdate(metaDataUpdate);
516529
break;
517530
case TuneResponse.Key:
518531
TuneResponse.Read(frame, out var tuneResponse);
@@ -785,9 +798,9 @@ public async ValueTask<MetaDataResponse> QueryMetadata(string[] streams)
785798

786799
public async Task<bool> StreamExists(string stream)
787800
{
788-
var streams = new[] { stream };
801+
var streams = new[] {stream};
789802
var response = await QueryMetadata(streams).ConfigureAwait(false);
790-
return response.StreamInfos is { Count: >= 1 } &&
803+
return response.StreamInfos is {Count: >= 1} &&
791804
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
792805
}
793806

@@ -834,7 +847,7 @@ public static ManualResetValueTaskSource<T> Rent()
834847
}
835848
else
836849
{
837-
return new ManualResetValueTaskSource<T>() { RunContinuationsAsynchronously = true };
850+
return new ManualResetValueTaskSource<T>() {RunContinuationsAsynchronously = true};
838851
}
839852
}
840853

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ RabbitMQ.Stream.Client.ICompressionCodec.UnCompress(System.Buffers.ReadOnlySeque
335335
RabbitMQ.Stream.Client.ICompressionCodec.UnCompressedSize.get -> int
336336
RabbitMQ.Stream.Client.ICompressionCodec.Write(System.Span<byte> span) -> int
337337
RabbitMQ.Stream.Client.IConsumer
338-
RabbitMQ.Stream.Client.IConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
339338
RabbitMQ.Stream.Client.IConsumer.Dispose() -> void
340339
RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
341340
RabbitMQ.Stream.Client.IConsumerConfig
@@ -357,7 +356,6 @@ RabbitMQ.Stream.Client.IOffsetType.OffsetType.get -> RabbitMQ.Stream.Client.Offs
357356
RabbitMQ.Stream.Client.IOffsetType.Size.get -> int
358357
RabbitMQ.Stream.Client.IOffsetType.Write(System.Span<byte> span) -> int
359358
RabbitMQ.Stream.Client.IProducer
360-
RabbitMQ.Stream.Client.IProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
361359
RabbitMQ.Stream.Client.IProducer.ConfirmFrames.get -> int
362360
RabbitMQ.Stream.Client.IProducer.Dispose() -> void
363361
RabbitMQ.Stream.Client.IProducer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>
@@ -559,7 +557,6 @@ RabbitMQ.Stream.Client.RawProducerConfig.MetadataHandler.set -> void
559557
RabbitMQ.Stream.Client.RawProducerConfig.RawProducerConfig(string stream) -> void
560558
RabbitMQ.Stream.Client.RawProducerConfig.Stream.get -> string
561559
RabbitMQ.Stream.Client.RawSuperStreamConsumer
562-
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
563560
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Dispose() -> void
564561
RabbitMQ.Stream.Client.RawSuperStreamConsumer.StoreOffset(ulong offset) -> System.Threading.Tasks.Task
565562
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig
@@ -570,7 +567,6 @@ RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.OffsetSpec.set -> void
570567
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.RawSuperStreamConsumerConfig(string superStream) -> void
571568
RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.SuperStream.get -> string
572569
RabbitMQ.Stream.Client.RawSuperStreamProducer
573-
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
574570
RabbitMQ.Stream.Client.RawSuperStreamProducer.ConfirmFrames.get -> int
575571
RabbitMQ.Stream.Client.RawSuperStreamProducer.Dispose() -> void
576572
RabbitMQ.Stream.Client.RawSuperStreamProducer.GetLastPublishingId() -> System.Threading.Tasks.Task<ulong>

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey)
2424
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
2525
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
2626
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
27+
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
28+
RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
2729
RabbitMQ.Stream.Client.ConnectionItem
2830
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
2931
RabbitMQ.Stream.Client.ConnectionItem.BrokerInfo.get -> string
@@ -63,6 +65,7 @@ RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
6365
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
6466
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
6567
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
68+
RabbitMQ.Stream.Client.IConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6669
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
6770
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
6871
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
@@ -96,6 +99,7 @@ RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
9699
RabbitMQ.Stream.Client.Info
97100
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
98101
RabbitMQ.Stream.Client.Info.Stream.get -> string
102+
RabbitMQ.Stream.Client.IProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
99103
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
100104
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
101105
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
@@ -123,7 +127,9 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
123127
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
124128
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
125129
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
130+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
126131
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
132+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
127133
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
128134
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
129135
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ internal void Validate()
8787

8888
switch (ConsumerFilter)
8989
{
90-
case { PostFilter: null }:
90+
case {PostFilter: null}:
9191
throw new ArgumentException("PostFilter must be provided when Filter is set");
92-
case { Values.Count: 0 }:
92+
case {Values.Count: 0}:
9393
throw new ArgumentException("Values must be provided when Filter is set");
9494
}
9595
}
9696

97-
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
97+
internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0};
9898

9999
// it is needed to be able to add the subscriptions arguments
100100
// see consumerProperties["super-stream"] = SuperStream;
@@ -402,7 +402,6 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
402402
// we request the credit before process the check to keep the network busy
403403
try
404404
{
405-
406405
await _client.Credit(_subscriberId, 1).ConfigureAwait(false);
407406
}
408407
catch (InvalidOperationException)
@@ -460,20 +459,9 @@ private async Task Init()
460459
{
461460
_config.Validate();
462461

463-
_client.ConnectionClosed += async reason =>
464-
{
465-
_config.Pool.Remove(_client.ClientId);
466-
await Close().ConfigureAwait(false);
467-
if (_config.ConnectionClosedHandler != null)
468-
{
469-
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
470-
}
471-
};
462+
_client.ConnectionClosed += OnConnectionClosed();
463+
_client.Parameters.OnMetadataUpdate += OnMetadataUpdate();
472464

473-
if (_config.MetadataHandler != null)
474-
{
475-
_client.Parameters.MetadataHandler += _config.MetadataHandler;
476-
}
477465

478466
var consumerProperties = new Dictionary<string, string>();
479467

@@ -580,6 +568,24 @@ private async Task Init()
580568
throw new CreateConsumerException($"consumer could not be created code: {response.ResponseCode}");
581569
}
582570

571+
private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
572+
data =>
573+
{
574+
Close().ConfigureAwait(false);
575+
_config.MetadataHandler?.Invoke(data);
576+
};
577+
578+
private Client.ConnectionCloseHandler OnConnectionClosed() =>
579+
async reason =>
580+
{
581+
_config.Pool.Remove(_client.ClientId);
582+
await Close().ConfigureAwait(false);
583+
if (_config.ConnectionClosedHandler != null)
584+
{
585+
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
586+
}
587+
};
588+
583589
public override async Task<ResponseCode> Close()
584590
{
585591
// this unlock the consumer if it is waiting for a message
@@ -591,7 +597,11 @@ public override async Task<ResponseCode> Close()
591597
}
592598

593599
_status = EntityStatus.Closed;
600+
_client.ConnectionClosed -= OnConnectionClosed();
601+
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
602+
594603
var result = ResponseCode.Ok;
604+
595605
try
596606
{
597607
var unsubscribeResponse =
@@ -611,7 +621,7 @@ public override async Task<ResponseCode> Close()
611621
{
612622
_logger.LogError(e,
613623
"Error removing {ConsumerInfo} from the server",
614-
ConsumerInfo());
624+
ConsumerInfo());
615625
}
616626

617627
var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}",

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public RawProducerConfig(string stream)
4848

4949
internal void Validate()
5050
{
51-
if (Filter is { FilterValue: not null } && !AvailableFeaturesSingleton.Instance.PublishFilter)
51+
if (Filter is {FilterValue: not null} && !AvailableFeaturesSingleton.Instance.PublishFilter)
5252
{
5353
throw new UnsupportedOperationException(Consts.FilterNotSupported);
5454
}
@@ -141,9 +141,7 @@ private async Task Init()
141141
{
142142
_config.ConfirmHandler(new Confirmation
143143
{
144-
PublishingId = id,
145-
Code = ResponseCode.Ok,
146-
Stream = _config.Stream
144+
PublishingId = id, Code = ResponseCode.Ok, Stream = _config.Stream
147145
});
148146
}
149147
catch (Exception e)
@@ -163,7 +161,7 @@ private async Task Init()
163161
{
164162
foreach (var (id, code) in errors)
165163
{
166-
_config.ConfirmHandler(new Confirmation { PublishingId = id, Code = code, });
164+
_config.ConfirmHandler(new Confirmation {PublishingId = id, Code = code,});
167165
}
168166

169167
_semaphore.Release(errors.Length);
@@ -178,7 +176,7 @@ private async Task Init()
178176
throw new CreateProducerException($"producer could not be created code: {response.ResponseCode}");
179177
}
180178

181-
private bool IsFilteringEnabled => _config.Filter is { FilterValue: not null };
179+
private bool IsFilteringEnabled => _config.Filter is {FilterValue: not null};
182180

183181
/// <summary>
184182
/// SubEntry Batch send: Aggregate more messages under the same publishingId.
@@ -358,6 +356,8 @@ public override async Task<ResponseCode> Close()
358356

359357
_status = EntityStatus.Closed;
360358
var result = ResponseCode.Ok;
359+
360+
361361
try
362362
{
363363
// The default timeout is usually 10 seconds

Tests/ConnectionsPoolTests.cs

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -704,42 +704,39 @@ public async void TheProducerConsumerPoolShouldBeConsistentInMultiThreadCreateDe
704704
// // this test doesn't work since the client parameters metadata handler is not an event
705705
// // for the moment I won't change the code. Introduced a new event is a breaking change
706706
//
707-
// // [Fact]
708-
// // public async void TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted()
709-
// // {
710-
// // var client = await Client.Create(new ClientParameters() { });
711-
// // const string Stream1 = "pool_test_stream_1_multi_thread_producer";
712-
// // await client.CreateStream(Stream1, new Dictionary<string, string>());
713-
// // const int IdsPerConnection = 2;
714-
// // var pool = new ConnectionsPool(0, IdsPerConnection);
715-
// // var metaDataInfo = await client.QueryMetadata(new[] {Stream1});
716-
// // var producerList = new ConcurrentDictionary<string, IProducer>();
717-
// //
718-
// // var tasksP = new List<Task>();
719-
// // for (var i = 0; i < (IdsPerConnection * 1); i++)
720-
// // {
721-
// // tasksP.Add(Task.Run(async () =>
722-
// // {
723-
// // var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1)
724-
// // {
725-
// // Pool = pool,
726-
// // },
727-
// // metaDataInfo.StreamInfos[Stream1]);
728-
// // producerList.TryAdd(Guid.NewGuid().ToString(), p);
729-
// // }));
730-
// // }
731-
// //
732-
// // await Task.WhenAll(tasksP);
733-
// //
734-
// // Assert.Equal(2, pool.Count);
735-
// // Assert.Equal(IdsPerConnection, pool.Connections.Values.First().ActiveIds);
736-
// // Assert.Equal(IdsPerConnection, pool.Connections.Values.Skip(1).First().ActiveIds);
737-
// //
738-
// // await client.DeleteStream(Stream1);
739-
// //
740-
// // SystemUtils.WaitUntil(() => pool.Count == 0);
741-
// // Assert.Equal(0, pool.Count);
742-
// // }
707+
[Fact]
708+
public async void TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted()
709+
{
710+
var client = await Client.Create(new ClientParameters() { });
711+
const string Stream1 = "pool_test_stream_1_delete_consumer";
712+
await client.CreateStream(Stream1, new Dictionary<string, string>());
713+
const int IdsPerConnection = 2;
714+
var pool = new ConnectionsPool(0, IdsPerConnection);
715+
var metaDataInfo = await client.QueryMetadata(new[] {Stream1});
716+
var iConsumers = new ConcurrentDictionary<string, IConsumer>();
717+
718+
var tasksP = new List<Task>();
719+
for (var i = 0; i < (IdsPerConnection * 1); i++)
720+
{
721+
tasksP.Add(Task.Run(async () =>
722+
{
723+
var p = await RawConsumer.Create(client.Parameters, new RawConsumerConfig(Stream1)
724+
{
725+
Pool = pool,
726+
},
727+
metaDataInfo.StreamInfos[Stream1]);
728+
iConsumers.TryAdd(Guid.NewGuid().ToString(), p);
729+
}));
730+
}
731+
732+
await Task.WhenAll(tasksP);
733+
734+
Assert.Equal(1, pool.ConnectionsCount);
735+
736+
await client.DeleteStream(Stream1);
737+
738+
SystemUtils.WaitUntil(() => pool.ConnectionsCount == 0);
739+
}
743740
//
744741
/// <summary>
745742
/// The pool has 13 ids per connection.

0 commit comments

Comments
 (0)