Skip to content

Commit c584782

Browse files
committed
Refactor RawProducer and RawConsumer classes
Remove duplication code between the Raw* classes and moved to AbstractEntity class. Fix a rare condition where the producer still receive confirmation but the producer was removed from the internal list due of cancellation. Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 987e432 commit c584782

File tree

10 files changed

+227
-158
lines changed

10 files changed

+227
-158
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99

1010
namespace RabbitMQ.Stream.Client
1111
{
12+
13+
public abstract record EntityCommonConfig
14+
{
15+
internal ConnectionsPool Pool { get; set; }
16+
}
17+
1218
internal enum EntityStatus
1319
{
1420
Open,
@@ -18,28 +24,73 @@ internal enum EntityStatus
1824

1925
public interface IClosable
2026
{
21-
public Task<ResponseCode> Close(bool ignoreIfClosed = false);
27+
public Task<ResponseCode> Close();
2228
}
2329

2430
public abstract class AbstractEntity : IClosable
2531
{
2632
private readonly CancellationTokenSource _cancelTokenSource = new();
2733
protected CancellationToken Token => _cancelTokenSource.Token;
28-
34+
protected ILogger Logger { get; init; }
2935
internal EntityStatus _status = EntityStatus.Closed;
3036

37+
protected byte EntityId { get; set; }
38+
protected abstract string GetStream();
39+
protected abstract string DumpEntityConfiguration();
40+
3141
// here the _cancelTokenSource is disposed and the token is cancelled
3242
// in producer is used to cancel the send task
3343
// in consumer is used to cancel the receive task
34-
protected void MaybeCancelToken()
44+
private void MaybeCancelToken()
3545
{
3646
if (!_cancelTokenSource.IsCancellationRequested)
3747
_cancelTokenSource.Cancel();
3848
}
3949

40-
public abstract Task<ResponseCode> Close(bool ignoreIfClosed = false);
50+
public abstract Task<ResponseCode> Close();
51+
52+
/// <summary>
53+
/// Remove the producer or consumer from the server
54+
/// </summary>
55+
/// <param name="ignoreIfAlreadyDeleted"> In case the producer or consumer is already removed from the server.
56+
/// ex: metadata update </param>
57+
/// <returns></returns>
58+
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
59+
60+
/// <summary>
61+
/// Internal close method. It is called by the public Close method.
62+
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
63+
/// Close the TCP connection if it is not already closed or it is needed.
64+
/// </summary>
65+
/// <param name="config">The connection pool instance</param>
66+
/// <param name="ignoreIfAlreadyDeleted"></param>
67+
/// <returns></returns>
68+
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
69+
{
70+
MaybeCancelToken();
71+
72+
if (!IsOpen()) // the client is already closed
73+
{
74+
return ResponseCode.Ok;
75+
}
76+
77+
_status = EntityStatus.Closed;
78+
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
79+
80+
if (_client is { IsClosed: true })
81+
{
82+
return result;
83+
}
84+
85+
var closed = await _client.MaybeClose($"closing: {EntityId}",
86+
GetStream(), config.Pool)
87+
.ConfigureAwait(false);
88+
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
89+
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
90+
return result;
91+
}
4192

42-
protected void Dispose(bool disposing, string entityInfo, ILogger logger)
93+
protected void Dispose(bool disposing)
4394
{
4495
if (!disposing)
4596
{
@@ -56,12 +107,12 @@ protected void Dispose(bool disposing, string entityInfo, ILogger logger)
56107
var closeTask = Close();
57108
if (!closeTask.Wait(Consts.MidWait))
58109
{
59-
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
110+
Logger?.LogWarning("Failed to close {EntityInfo} in time", DumpEntityConfiguration());
60111
}
61112
}
62113
catch (Exception e)
63114
{
64-
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
115+
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
65116
}
66117
finally
67118
{

RabbitMQ.Stream.Client/Client.cs

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -331,22 +331,28 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
331331
return (publisherId, response);
332332
}
333333

334-
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
334+
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId,
335+
bool ignoreIfAlreadyRemoved = false)
335336
{
336337
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
337338
try
338339
{
339-
var result =
340-
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
341-
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
340+
if (!ignoreIfAlreadyRemoved)
341+
{
342+
var result =
343+
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
344+
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
342345

343-
return result;
346+
return result;
347+
}
344348
}
345349
finally
346350
{
347351
publishers.Remove(publisherId);
348352
_poolSemaphore.Release();
349353
}
354+
355+
return new DeletePublisherResponse();
350356
}
351357

352358
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
@@ -492,12 +498,25 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
492498
case PublishConfirm.Key:
493499
PublishConfirm.Read(frame, out var confirm);
494500
confirmFrames += 1;
495-
var (confirmCallback, _) = publishers[confirm.PublisherId];
496-
confirmCallback(confirm.PublishingIds);
497-
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
501+
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
498502
{
499-
if (confirmSegment.Array != null)
500-
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
503+
var (confirmCallback, _) = publisherConf;
504+
confirmCallback(confirm.PublishingIds);
505+
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
506+
{
507+
if (confirmSegment.Array != null)
508+
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
509+
}
510+
}
511+
else
512+
{
513+
// the producer is not found, this can happen when the producer is closing
514+
// and there are still confirmation on the wire
515+
// we can ignore the error since the producer does not exists anymore
516+
_logger?.LogDebug(
517+
"Could not find stream producer {ID} or producer is closing." +
518+
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
519+
confirm.PublisherId);
501520
}
502521

503522
break;
@@ -522,8 +541,22 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
522541
break;
523542
case PublishError.Key:
524543
PublishError.Read(frame, out var error);
525-
var (_, errorCallback) = publishers[error.PublisherId];
526-
errorCallback(error.PublishingErrors);
544+
if (publishers.TryGetValue(error.PublisherId, out var publisher))
545+
{
546+
var (_, errorCallback) = publisher;
547+
errorCallback(error.PublishingErrors);
548+
}
549+
else
550+
{
551+
// the producer is not found, this can happen when the producer is closing
552+
// and there are still confirmation on the wire
553+
// we can ignore the error since the producer does not exists anymore
554+
_logger?.LogDebug(
555+
"Could not find stream producer {ID} or producer is closing." +
556+
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
557+
error.PublisherId);
558+
}
559+
527560
break;
528561
case MetaDataUpdate.Key:
529562
MetaDataUpdate.Read(frame, out var metaDataUpdate);

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,10 @@ public interface IConsumer : IClosable
1515
public ConsumerInfo Info { get; }
1616
}
1717

18-
public record IConsumerConfig : INamedEntity
18+
public record IConsumerConfig : EntityCommonConfig, INamedEntity
1919
{
2020
private ushort _initialCredits = Consts.ConsumerInitialCredits;
2121

22-
internal ConnectionsPool Pool { get; set; }
23-
2422
// StoredOffsetSpec configuration it is needed to keep the offset spec.
2523
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
2624
// and from ConsumerConfig.ConsumerUpdateListener.

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,9 @@ public record ProducerFilter
8181
public Func<Message, string> FilterValue { get; set; } = null;
8282
}
8383

84-
public record IProducerConfig : INamedEntity
84+
public record IProducerConfig : EntityCommonConfig, INamedEntity
8585
{
8686

87-
internal ConnectionsPool Pool { get; set; }
88-
8987
public string Reference { get; set; }
9088
public int MaxInFlight { get; set; } = 1_000;
9189
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ RabbitMQ.Stream.Client.Client.ConnectionCloseHandler
169169
RabbitMQ.Stream.Client.Client.ConnectionProperties.get -> System.Collections.Generic.IDictionary<string, string>
170170
RabbitMQ.Stream.Client.Client.CreateStream(string stream, System.Collections.Generic.IDictionary<string, string> args) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.CreateResponse>
171171
RabbitMQ.Stream.Client.Client.Credit(byte subscriptionId, ushort credit) -> System.Threading.Tasks.ValueTask<bool>
172-
RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.DeletePublisherResponse>
173172
RabbitMQ.Stream.Client.Client.DeleteStream(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.DeleteResponse>
174173
RabbitMQ.Stream.Client.Client.IncomingFrames.get -> int
175174
RabbitMQ.Stream.Client.Client.IsClosed.get -> bool

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1-
abstract RabbitMQ.Stream.Client.AbstractEntity.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
1+
abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
2+
abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
3+
abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string
4+
abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string
25
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
36
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
47
override RabbitMQ.Stream.Client.Broker.ToString() -> string
5-
override RabbitMQ.Stream.Client.RawConsumer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
6-
override RabbitMQ.Stream.Client.RawProducer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
7-
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing, string entityInfo, Microsoft.Extensions.Logging.ILogger logger) -> void
8+
override RabbitMQ.Stream.Client.RawConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
9+
override RabbitMQ.Stream.Client.RawProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
10+
RabbitMQ.Stream.Client.AbstractEntity.Dispose(bool disposing) -> void
11+
RabbitMQ.Stream.Client.AbstractEntity.EntityId.get -> byte
12+
RabbitMQ.Stream.Client.AbstractEntity.EntityId.set -> void
813
RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
9-
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
14+
RabbitMQ.Stream.Client.AbstractEntity.Logger.get -> Microsoft.Extensions.Logging.ILogger
15+
RabbitMQ.Stream.Client.AbstractEntity.Logger.init -> void
16+
RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
1017
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
1118
RabbitMQ.Stream.Client.AuthMechanism
1219
RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism
@@ -19,6 +26,7 @@ RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
1926
RabbitMQ.Stream.Client.Client.ClientId.get -> string
2027
RabbitMQ.Stream.Client.Client.ClientId.init -> void
2128
RabbitMQ.Stream.Client.Client.DeclarePublisher(string publisherRef, string stream, System.Action<System.ReadOnlyMemory<ulong>> confirmCallback, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]> errorCallback, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.DeclarePublisherResponse)>
29+
RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.DeletePublisherResponse>
2230
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
2331
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
2432
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
@@ -63,11 +71,12 @@ RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference
6371
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
6472
RabbitMQ.Stream.Client.CrcException
6573
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
74+
RabbitMQ.Stream.Client.EntityCommonConfig
6675
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
6776
RabbitMQ.Stream.Client.IClient.ClientId.get -> string
6877
RabbitMQ.Stream.Client.IClient.ClientId.init -> void
6978
RabbitMQ.Stream.Client.IClosable
70-
RabbitMQ.Stream.Client.IClosable.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
79+
RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
7180
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
7281
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
7382
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
@@ -128,9 +137,9 @@ RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
128137
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
129138
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
130139
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
131-
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
140+
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
132141
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
133-
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close(bool ignoreIfClosed = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
142+
RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
134143
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
135144
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
136145
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void

0 commit comments

Comments
 (0)