Skip to content

Commit d5cdce4

Browse files
authored
New event to handle Metadata update (#332)
With this PR #328 the client can handle multi-producers and consumers per connection. This PR removes MetadataHandler and introduces OnMetadataUpdate event. The event can handle multiple Metadata updates coming from the server. Metadata update is raised when a stream is deleted, or a replica is removed. The server automatically removes the producers and consumers linked to the connection, here we need to remove these entities from the internal pool to be consistent. - Refactor RawConsumer and RawProducer. Remove duplication code. Move the common code to the AbstractEntity Class --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4cefb84 commit d5cdce4

14 files changed

+463
-242
lines changed

.github/workflows/build-test.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ jobs:
6767
key: ${{ runner.os }}-v2-nuget-${{ hashFiles('**/*.csproj') }}
6868
restore-keys: |
6969
${{ runner.os }}-v2-nuget-
70+
- name: Wait RabbitMQ is Up
71+
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmqctl wait --pid 1 --timeout 60
7072
- name: Enable RabbitMQ Plugins
7173
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
7274
- name: Restore

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,87 @@
1010
namespace RabbitMQ.Stream.Client
1111
{
1212

13+
public abstract record EntityCommonConfig
14+
{
15+
internal ConnectionsPool Pool { get; set; }
16+
}
17+
1318
internal enum EntityStatus
1419
{
1520
Open,
1621
Closed,
1722
Disposed
1823
}
19-
public abstract class AbstractEntity
24+
25+
public interface IClosable
26+
{
27+
public Task<ResponseCode> Close();
28+
}
29+
30+
public abstract class AbstractEntity : IClosable
2031
{
2132
private readonly CancellationTokenSource _cancelTokenSource = new();
2233
protected CancellationToken Token => _cancelTokenSource.Token;
23-
34+
protected ILogger Logger { get; init; }
2435
internal EntityStatus _status = EntityStatus.Closed;
36+
37+
protected byte EntityId { get; set; }
38+
protected abstract string GetStream();
39+
protected abstract string DumpEntityConfiguration();
40+
2541
// here the _cancelTokenSource is disposed and the token is cancelled
2642
// in producer is used to cancel the send task
2743
// in consumer is used to cancel the receive task
28-
protected void MaybeCancelToken()
44+
private void MaybeCancelToken()
2945
{
30-
3146
if (!_cancelTokenSource.IsCancellationRequested)
3247
_cancelTokenSource.Cancel();
3348
}
3449

3550
public abstract Task<ResponseCode> Close();
3651

37-
protected void Dispose(bool disposing, string entityInfo, ILogger logger)
52+
/// <summary>
53+
/// Remove the producer or consumer from the server
54+
/// </summary>
55+
/// <param name="ignoreIfAlreadyDeleted"> In case the producer or consumer is already removed from the server.
56+
/// ex: metadata update </param>
57+
/// <returns></returns>
58+
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
59+
60+
/// <summary>
61+
/// Internal close method. It is called by the public Close method.
62+
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
63+
/// Close the TCP connection if it is not already closed or it is needed.
64+
/// </summary>
65+
/// <param name="config">The connection pool instance</param>
66+
/// <param name="ignoreIfAlreadyDeleted"></param>
67+
/// <returns></returns>
68+
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
69+
{
70+
MaybeCancelToken();
71+
72+
if (!IsOpen()) // the client is already closed
73+
{
74+
return ResponseCode.Ok;
75+
}
76+
77+
_status = EntityStatus.Closed;
78+
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
79+
80+
if (_client is { IsClosed: true })
81+
{
82+
return result;
83+
}
84+
85+
var closed = await _client.MaybeClose($"closing: {EntityId}",
86+
GetStream(), config.Pool)
87+
.ConfigureAwait(false);
88+
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
89+
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
90+
return result;
91+
}
92+
93+
protected void Dispose(bool disposing)
3894
{
3995
if (!disposing)
4096
{
@@ -51,12 +107,12 @@ protected void Dispose(bool disposing, string entityInfo, ILogger logger)
51107
var closeTask = Close();
52108
if (!closeTask.Wait(Consts.MidWait))
53109
{
54-
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
110+
Logger?.LogWarning("Failed to close {EntityInfo} in time", DumpEntityConfiguration());
55111
}
56112
}
57113
catch (Exception e)
58114
{
59-
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
115+
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
60116
}
61117
finally
62118
{
@@ -70,6 +126,5 @@ public bool IsOpen()
70126
}
71127

72128
internal Client _client;
73-
74129
}
75130
}

RabbitMQ.Stream.Client/Client.cs

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public record ClientParameters
5353
public string VirtualHost { get; set; } = "/";
5454
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);
5555

56-
public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
56+
public delegate void MetadataUpdateHandler(MetaDataUpdate update);
57+
58+
public event MetadataUpdateHandler OnMetadataUpdate;
5759
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
5860
public TimeSpan Heartbeat { get; set; } = TimeSpan.FromMinutes(1);
5961

@@ -71,6 +73,11 @@ public string ClientProvidedName
7173
public AddressResolver AddressResolver { get; set; } = null;
7274

7375
public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;
76+
77+
internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
78+
{
79+
OnMetadataUpdate?.Invoke(metaDataUpdate);
80+
}
7481
}
7582

7683
internal readonly struct OutgoingMsg : ICommand
@@ -213,7 +220,8 @@ await client
213220
.ConfigureAwait(false);
214221
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);
215222

216-
var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpperInvariant(),
223+
var isValid = saslHandshakeResponse.Mechanisms.Contains(
224+
parameters.AuthMechanism.ToString().ToUpperInvariant(),
217225
StringComparer.OrdinalIgnoreCase);
218226
if (!isValid)
219227
{
@@ -225,7 +233,8 @@ await client
225233
var authResponse =
226234
await client
227235
.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
228-
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), saslData))
236+
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(),
237+
saslData))
229238
.ConfigureAwait(false);
230239
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);
231240

@@ -322,22 +331,28 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
322331
return (publisherId, response);
323332
}
324333

325-
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
334+
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId,
335+
bool ignoreIfAlreadyRemoved = false)
326336
{
327337
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
328338
try
329339
{
330-
var result =
331-
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
332-
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
340+
if (!ignoreIfAlreadyRemoved)
341+
{
342+
var result =
343+
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
344+
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
333345

334-
return result;
346+
return result;
347+
}
335348
}
336349
finally
337350
{
338351
publishers.Remove(publisherId);
339352
_poolSemaphore.Release();
340353
}
354+
355+
return new DeletePublisherResponse();
341356
}
342357

343358
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
@@ -386,20 +401,24 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
386401
return (subscriptionId, response);
387402
}
388403

389-
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
404+
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
390405
{
391406
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
392407
try
393408
{
394-
// here we reduce a bit the timeout to avoid waiting too much
395-
// if the client is busy with read operations it can take time to process the unsubscribe
396-
// but the subscribe is removed.
397-
var result =
398-
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
399-
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
400-
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);
401-
402-
return result;
409+
if (!ignoreIfAlreadyRemoved)
410+
{
411+
// here we reduce a bit the timeout to avoid waiting too much
412+
// if the client is busy with read operations it can take time to process the unsubscribe
413+
// but the subscribe is removed.
414+
var result =
415+
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
416+
new UnsubscribeRequest(corr, subscriptionId),
417+
TimeSpan.FromSeconds(5)).ConfigureAwait(false);
418+
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);
419+
420+
return result;
421+
}
403422
}
404423
finally
405424
{
@@ -408,6 +427,8 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
408427
consumers.Remove(subscriptionId);
409428
_poolSemaphore.Release();
410429
}
430+
431+
return new UnsubscribeResponse();
411432
}
412433

413434
public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
@@ -477,12 +498,25 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
477498
case PublishConfirm.Key:
478499
PublishConfirm.Read(frame, out var confirm);
479500
confirmFrames += 1;
480-
var (confirmCallback, _) = publishers[confirm.PublisherId];
481-
confirmCallback(confirm.PublishingIds);
482-
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
501+
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
483502
{
484-
if (confirmSegment.Array != null)
485-
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
503+
var (confirmCallback, _) = publisherConf;
504+
confirmCallback(confirm.PublishingIds);
505+
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
506+
{
507+
if (confirmSegment.Array != null)
508+
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
509+
}
510+
}
511+
else
512+
{
513+
// the producer is not found, this can happen when the producer is closing
514+
// and there are still confirmation on the wire
515+
// we can ignore the error since the producer does not exists anymore
516+
_logger?.LogDebug(
517+
"Could not find stream producer {ID} or producer is closing." +
518+
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
519+
confirm.PublisherId);
486520
}
487521

488522
break;
@@ -507,12 +541,26 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
507541
break;
508542
case PublishError.Key:
509543
PublishError.Read(frame, out var error);
510-
var (_, errorCallback) = publishers[error.PublisherId];
511-
errorCallback(error.PublishingErrors);
544+
if (publishers.TryGetValue(error.PublisherId, out var publisher))
545+
{
546+
var (_, errorCallback) = publisher;
547+
errorCallback(error.PublishingErrors);
548+
}
549+
else
550+
{
551+
// the producer is not found, this can happen when the producer is closing
552+
// and there are still confirmation on the wire
553+
// we can ignore the error since the producer does not exists anymore
554+
_logger?.LogDebug(
555+
"Could not find stream producer {ID} or producer is closing." +
556+
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
557+
error.PublisherId);
558+
}
559+
512560
break;
513561
case MetaDataUpdate.Key:
514562
MetaDataUpdate.Read(frame, out var metaDataUpdate);
515-
Parameters.MetadataHandler(metaDataUpdate);
563+
Parameters.FireMetadataUpdate(metaDataUpdate);
516564
break;
517565
case TuneResponse.Key:
518566
TuneResponse.Read(frame, out var tuneResponse);

RabbitMQ.Stream.Client/IConsumer.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,18 @@
77

88
namespace RabbitMQ.Stream.Client;
99

10-
public interface IConsumer
10+
public interface IConsumer : IClosable
1111
{
1212
public Task StoreOffset(ulong offset);
13-
public Task<ResponseCode> Close();
1413
public void Dispose();
1514

1615
public ConsumerInfo Info { get; }
1716
}
1817

19-
public record IConsumerConfig : INamedEntity
18+
public record IConsumerConfig : EntityCommonConfig, INamedEntity
2019
{
2120
private ushort _initialCredits = Consts.ConsumerInitialCredits;
2221

23-
internal ConnectionsPool Pool { get; set; }
24-
2522
// StoredOffsetSpec configuration it is needed to keep the offset spec.
2623
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
2724
// and from ConsumerConfig.ConsumerUpdateListener.

RabbitMQ.Stream.Client/IProducer.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace RabbitMQ.Stream.Client;
1515
// - Super-Stream producer
1616
// </summary>
1717

18-
public interface IProducer
18+
public interface IProducer : IClosable
1919
{
2020
/// <summary>
2121
/// Send the message to the stream in asynchronous mode.
@@ -49,8 +49,6 @@ public interface IProducer
4949
/// <returns></returns>
5050
public ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType);
5151

52-
public Task<ResponseCode> Close();
53-
5452
/// <summary>
5553
/// Return the last publishing id.
5654
/// </summary>
@@ -83,11 +81,9 @@ public record ProducerFilter
8381
public Func<Message, string> FilterValue { get; set; } = null;
8482
}
8583

86-
public record IProducerConfig : INamedEntity
84+
public record IProducerConfig : EntityCommonConfig, INamedEntity
8785
{
8886

89-
internal ConnectionsPool Pool { get; set; }
90-
9187
public string Reference { get; set; }
9288
public int MaxInFlight { get; set; } = 1_000;
9389
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";

0 commit comments

Comments
 (0)