Skip to content

Commit 10a3f5c

Browse files
committed
remove lock for events
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent b75284f commit 10a3f5c

File tree

6 files changed

+14
-29
lines changed

6 files changed

+14
-29
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -205,26 +205,6 @@ private async Task OnConnectionClosed(string reason)
205205
}
206206
}
207207

208-
private readonly SemaphoreSlim _attachSemaphore = new(1, 1);
209-
210-
public void AttachEventsToTheClient(ConnectionCloseHandler connectionCloseHandler,
211-
ClientParameters.MetadataUpdateHandler metadataUpdateHandler)
212-
{
213-
_attachSemaphore.Wait();
214-
ConnectionClosed += connectionCloseHandler;
215-
Parameters.OnMetadataUpdate += metadataUpdateHandler;
216-
_attachSemaphore.Release();
217-
}
218-
219-
public void DetachEventsFromTheClient(ConnectionCloseHandler connectionCloseHandler,
220-
ClientParameters.MetadataUpdateHandler metadataUpdateHandler)
221-
{
222-
_attachSemaphore.Wait();
223-
ConnectionClosed -= connectionCloseHandler;
224-
Parameters.OnMetadataUpdate -= metadataUpdateHandler;
225-
_attachSemaphore.Release();
226-
}
227-
228208
public static async Task<Client> Create(ClientParameters parameters, ILogger logger = null)
229209
{
230210
var client = new Client(parameters, logger);

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ RabbitMQ.Stream.Client.AuthMechanismNotSupportedException.AuthMechanismNotSuppor
2828
RabbitMQ.Stream.Client.Chunk.Crc.get -> uint
2929
RabbitMQ.Stream.Client.Chunk.Data.get -> System.ReadOnlyMemory<byte>
3030
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
31-
RabbitMQ.Stream.Client.Client.AttachEventsToTheClient(RabbitMQ.Stream.Client.Client.ConnectionCloseHandler connectionCloseHandler, RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler metadataUpdateHandler) -> void
3231
RabbitMQ.Stream.Client.Client.ClientId.get -> string
3332
RabbitMQ.Stream.Client.Client.ClientId.init -> void
3433
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
3534
RabbitMQ.Stream.Client.Client.DeclarePublisher(string publisherRef, string stream, System.Action<System.ReadOnlyMemory<ulong>> confirmCallback, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]> errorCallback, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.DeclarePublisherResponse)>
3635
RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.DeletePublisherResponse>
37-
RabbitMQ.Stream.Client.Client.DetachEventsFromTheClient(RabbitMQ.Stream.Client.Client.ConnectionCloseHandler connectionCloseHandler, RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler metadataUpdateHandler) -> void
3836
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
3937
RabbitMQ.Stream.Client.Client.Publishers.get -> System.Collections.Generic.IDictionary<byte, (string, (System.Action<System.ReadOnlyMemory<ulong>>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))>
4038
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,9 @@ internal async Task Init()
587587

588588
if (response.ResponseCode == ResponseCode.Ok)
589589
{
590-
_client.AttachEventsToTheClient(OnConnectionClosed(), OnMetadataUpdate());
590+
_client.ConnectionClosed += OnConnectionClosed();
591+
_client.Parameters.OnMetadataUpdate += OnMetadataUpdate();
592+
591593
_status = EntityStatus.Open;
592594
// the subscription is completed so the parsechunk can start to process the chunks
593595
_completeSubscription.SetResult();
@@ -609,7 +611,8 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
609611
// remove the event since the consumer is closed
610612
// only if the stream is the valid
611613

612-
_client.DetachEventsFromTheClient(OnConnectionClosed(), OnMetadataUpdate());
614+
_client.ConnectionClosed -= OnConnectionClosed();
615+
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
613616

614617
// at this point the server has removed the consumer from the list
615618
// and the unsubscribe is not needed anymore (ignoreIfClosed = true)
@@ -623,7 +626,9 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
623626
private Client.ConnectionCloseHandler OnConnectionClosed() =>
624627
async reason =>
625628
{
626-
_client.DetachEventsFromTheClient(OnConnectionClosed(), OnMetadataUpdate());
629+
_client.ConnectionClosed -= OnConnectionClosed();
630+
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
631+
627632
// remove the event since the connection is closed
628633
_config.Pool.Remove(_client.ClientId);
629634
UpdateStatusToClosed();

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ private async Task Init()
151151

152152
if (response.ResponseCode == ResponseCode.Ok)
153153
{
154-
_client.AttachEventsToTheClient(OnConnectionClosed(), OnMetadataUpdate());
154+
_client.ConnectionClosed += OnConnectionClosed();
155+
_client.Parameters.OnMetadataUpdate += OnMetadataUpdate();
155156
_status = EntityStatus.Open;
156157
return;
157158
}
@@ -183,7 +184,8 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
183184
if (metaDataUpdate.Stream != _config.Stream)
184185
return;
185186

186-
_client.DetachEventsFromTheClient(OnConnectionClosed(), OnMetadataUpdate());
187+
_client.ConnectionClosed -= OnConnectionClosed();
188+
_client.Parameters.OnMetadataUpdate -= OnMetadataUpdate();
187189

188190
_config.Pool.RemoveProducerEntityFromStream(_client.ClientId, EntityId, _config.Stream);
189191

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
5757
{
5858
if (closeReason == ConnectionClosedReason.Normal)
5959
{
60-
BaseLogger.LogInformation("Reconnect is skipped. {Identity} is closed normally", ToString());
60+
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
6161
return;
6262
}
6363

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private async Task<IProducer> StandardProducer()
7575
{
7676
if (closeReason == ConnectionClosedReason.Normal)
7777
{
78-
BaseLogger.LogInformation("Reconnect is skipped. {Identity} is closed normally", ToString());
78+
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
7979
return;
8080
}
8181

0 commit comments

Comments
 (0)