Skip to content

Commit 25093d6

Browse files
committed
Replace the metadata handler from action to
event. The event is needed to handle multi producers and multi consumers per connection. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 80d759d commit 25093d6

13 files changed

+224
-95
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,35 @@
99

1010
namespace RabbitMQ.Stream.Client
1111
{
12-
1312
internal enum EntityStatus
1413
{
1514
Open,
1615
Closed,
1716
Disposed
1817
}
19-
public abstract class AbstractEntity
18+
19+
public interface IClosable
20+
{
21+
public Task<ResponseCode> Close(bool ignoreIfClosed = false);
22+
}
23+
24+
public abstract class AbstractEntity : IClosable
2025
{
2126
private readonly CancellationTokenSource _cancelTokenSource = new();
2227
protected CancellationToken Token => _cancelTokenSource.Token;
2328

2429
internal EntityStatus _status = EntityStatus.Closed;
30+
2531
// here the _cancelTokenSource is disposed and the token is cancelled
2632
// in producer is used to cancel the send task
2733
// in consumer is used to cancel the receive task
2834
protected void MaybeCancelToken()
2935
{
30-
3136
if (!_cancelTokenSource.IsCancellationRequested)
3237
_cancelTokenSource.Cancel();
3338
}
3439

35-
public abstract Task<ResponseCode> Close();
40+
public abstract Task<ResponseCode> Close(bool ignoreIfClosed = false);
3641

3742
protected void Dispose(bool disposing, string entityInfo, ILogger logger)
3843
{
@@ -70,6 +75,5 @@ public bool IsOpen()
7075
}
7176

7277
internal Client _client;
73-
7478
}
7579
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ public record ClientParameters
5353
public string VirtualHost { get; set; } = "/";
5454
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
5555

56-
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
57-
58-
5956
public delegate void MetadataUpdateHandler(MetaDataUpdate update);
6057

6158
public event MetadataUpdateHandler OnMetadataUpdate;
@@ -357,7 +354,7 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
357354
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
358355
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
359356
{
360-
return await Subscribe(new RawConsumerConfig(stream) {OffsetSpec = offsetType},
357+
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
361358
initialCredit,
362359
properties,
363360
deliverHandler,
@@ -398,20 +395,24 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
398395
return (subscriptionId, response);
399396
}
400397

401-
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
398+
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
402399
{
403400
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
404401
try
405402
{
406-
// here we reduce a bit the timeout to avoid waiting too much
407-
// if the client is busy with read operations it can take time to process the unsubscribe
408-
// but the subscribe is removed.
409-
var result =
410-
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
411-
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
412-
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);
413-
414-
return result;
403+
if (!ignoreIfAlreadyRemoved)
404+
{
405+
// here we reduce a bit the timeout to avoid waiting too much
406+
// if the client is busy with read operations it can take time to process the unsubscribe
407+
// but the subscribe is removed.
408+
var result =
409+
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
410+
new UnsubscribeRequest(corr, subscriptionId),
411+
TimeSpan.FromSeconds(5)).ConfigureAwait(false);
412+
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);
413+
414+
return result;
415+
}
415416
}
416417
finally
417418
{
@@ -420,6 +421,8 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
420421
consumers.Remove(subscriptionId);
421422
_poolSemaphore.Release();
422423
}
424+
425+
return new UnsubscribeResponse();
423426
}
424427

425428
public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
@@ -798,9 +801,9 @@ public async ValueTask<MetaDataResponse> QueryMetadata(string[] streams)
798801

799802
public async Task<bool> StreamExists(string stream)
800803
{
801-
var streams = new[] {stream};
804+
var streams = new[] { stream };
802805
var response = await QueryMetadata(streams).ConfigureAwait(false);
803-
return response.StreamInfos is {Count: >= 1} &&
806+
return response.StreamInfos is { Count: >= 1 } &&
804807
response.StreamInfos[stream].ResponseCode == ResponseCode.Ok;
805808
}
806809

@@ -847,7 +850,7 @@ public static ManualResetValueTaskSource<T> Rent()
847850
}
848851
else
849852
{
850-
return new ManualResetValueTaskSource<T>() {RunContinuationsAsynchronously = true};
853+
return new ManualResetValueTaskSource<T>() { RunContinuationsAsynchronously = true };
851854
}
852855
}
853856

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@
77

88
namespace RabbitMQ.Stream.Client;
99

10-
public interface IConsumer
10+
public interface IConsumer : IClosable
1111
{
1212
public Task StoreOffset(ulong offset);
13-
public Task<ResponseCode> Close();
1413
public void Dispose();
1514

1615
public ConsumerInfo Info { get; }

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace RabbitMQ.Stream.Client;
1515
// - Super-Stream producer
1616
// </summary>
1717

18-
public interface IProducer
18+
public interface IProducer : IClosable
1919
{
2020
/// <summary>
2121
/// Send the message to the stream in asynchronous mode.
@@ -49,8 +49,6 @@ public interface IProducer
4949
/// <returns></returns>
5050
public ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType);
5151

52-
public Task<ResponseCode> Close();
53-
5452
/// <summary>
5553
/// Return the last publishing id.
5654
/// </summary>

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong
188188
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
189189
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
190190
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
191-
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
192191
RabbitMQ.Stream.Client.ClientParameters
193192
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
194193
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
@@ -198,8 +197,6 @@ RabbitMQ.Stream.Client.ClientParameters.Endpoint.get -> System.Net.EndPoint
198197
RabbitMQ.Stream.Client.ClientParameters.Endpoint.set -> void
199198
RabbitMQ.Stream.Client.ClientParameters.Heartbeat.get -> System.TimeSpan
200199
RabbitMQ.Stream.Client.ClientParameters.Heartbeat.set -> void
201-
RabbitMQ.Stream.Client.ClientParameters.MetadataHandler.get -> System.Action<RabbitMQ.Stream.Client.MetaDataUpdate>
202-
RabbitMQ.Stream.Client.ClientParameters.MetadataHandler.set -> void
203200
RabbitMQ.Stream.Client.ClientParameters.Password.get -> string
204201
RabbitMQ.Stream.Client.ClientParameters.Password.set -> void
205202
RabbitMQ.Stream.Client.ClientParameters.Properties.get -> System.Collections.Generic.IDictionary<string, string>

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
1+
abstract RabbitMQ.Stream.Client.AbstractEntity.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
22
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
33
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
44
override RabbitMQ.Stream.Client.Broker.ToString() -> string
5-
override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6-
override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
5+
override RabbitMQ.Stream.Client.RawConsumer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6+
override RabbitMQ.Stream.Client.RawProducer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
77
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing, string entityInfo, Microsoft.Extensions.Logging.ILogger logger) -> void
88
RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
99
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
@@ -22,6 +22,7 @@ RabbitMQ.Stream.Client.Client.DeclarePublisher(string publisherRef, string strea
2222
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
2323
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
2424
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
25+
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
2526
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
2627
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
2728
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
@@ -65,7 +66,8 @@ RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
6566
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
6667
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
6768
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
68-
RabbitMQ.Stream.Client.IConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
69+
RabbitMQ.Stream.Client.IClosable
70+
RabbitMQ.Stream.Client.IClosable.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6971
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
7072
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
7173
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
@@ -99,7 +101,6 @@ RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
99101
RabbitMQ.Stream.Client.Info
100102
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
101103
RabbitMQ.Stream.Client.Info.Stream.get -> string
102-
RabbitMQ.Stream.Client.IProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
103104
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
104105
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
105106
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
@@ -127,9 +128,9 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
127128
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
128129
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
129130
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
130-
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
131+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
131132
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
132-
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
133+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
133134
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
134135
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
135136
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ internal void Validate()
8787

8888
switch (ConsumerFilter)
8989
{
90-
case {PostFilter: null}:
90+
case { PostFilter: null }:
9191
throw new ArgumentException("PostFilter must be provided when Filter is set");
92-
case {Values.Count: 0}:
92+
case { Values.Count: 0 }:
9393
throw new ArgumentException("Values must be provided when Filter is set");
9494
}
9595
}
9696

97-
internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0};
97+
internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
9898

9999
// it is needed to be able to add the subscriptions arguments
100100
// see consumerProperties["super-stream"] = SuperStream;
@@ -462,7 +462,6 @@ private async Task Init()
462462
_client.ConnectionClosed += OnConnectionClosed();
463463
_client.Parameters.OnMetadataUpdate += OnMetadataUpdate();
464464

465-
466465
var consumerProperties = new Dictionary<string, string>();
467466

468467
if (!string.IsNullOrEmpty(_config.Reference))
@@ -569,24 +568,40 @@ private async Task Init()
569568
}
570569

571570
private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
572-
data =>
571+
metaDataUpdate =>
573572
{
574-
Close().ConfigureAwait(false);
575-
_config.MetadataHandler?.Invoke(data);
573+
// the connection can handle different streams
574+
// we need to check if the metadata update is for the stream
575+
// where the consumer is consuming else can ignore the update
576+
if (metaDataUpdate.Stream != _config.Stream)
577+
return;
578+
// at this point the server has removed the consumer from the list
579+
// and the unsubscribe is not needed anymore (ignoreIfClosed = true)
580+
// we call the Close to re-enter to the standard behavior
581+
// ignoreIfClosed is an optimization to avoid to send the unsubscribe
582+
_config.MetadataHandler?.Invoke(metaDataUpdate);
583+
Close(true).ConfigureAwait(false);
584+
585+
// remove the event since the consumer is closed
586+
// only if the stream is the valid
587+
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
576588
};
577589

578590
private Client.ConnectionCloseHandler OnConnectionClosed() =>
579591
async reason =>
580592
{
581593
_config.Pool.Remove(_client.ClientId);
582-
await Close().ConfigureAwait(false);
594+
await Close(true).ConfigureAwait(false);
583595
if (_config.ConnectionClosedHandler != null)
584596
{
585597
await _config.ConnectionClosedHandler(reason).ConfigureAwait(false);
586598
}
599+
600+
// remove the event since the connection is closed
601+
_client.ConnectionClosed -= OnConnectionClosed();
587602
};
588603

589-
public override async Task<ResponseCode> Close()
604+
public override async Task<ResponseCode> Close(bool ignoreIfClosed = false)
590605
{
591606
// this unlock the consumer if it is waiting for a message
592607
// see DispatchMessage method where the token is used
@@ -597,15 +612,13 @@ public override async Task<ResponseCode> Close()
597612
}
598613

599614
_status = EntityStatus.Closed;
600-
_client.ConnectionClosed -= OnConnectionClosed();
601-
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
602615

603616
var result = ResponseCode.Ok;
604617

605618
try
606619
{
607620
var unsubscribeResponse =
608-
await _client.Unsubscribe(_subscriberId).ConfigureAwait(false);
621+
await _client.Unsubscribe(_subscriberId, ignoreIfClosed).ConfigureAwait(false);
609622
result = unsubscribeResponse.ResponseCode;
610623
}
611624

0 commit comments

Comments
 (0)