Skip to content

Commit ae0ea1c

Browse files
committed
restore internal close
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent bb5af4c commit ae0ea1c

File tree

2 files changed

+8
-6
lines changed

2 files changed

+8
-6
lines changed

RabbitMQ.Stream.Client/Client.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,15 +396,17 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
396396
if (response.ResponseCode == ResponseCode.Ok)
397397
return (subscriptionId, response);
398398

399-
ClientExceptions.MaybeThrowException(response.ResponseCode, $"Error while creating consumer for stream {config.Stream}");
399+
ClientExceptions.MaybeThrowException(response.ResponseCode,
400+
$"Error while creating consumer for stream {config.Stream}");
400401
}
401402
catch (Exception e)
402403
{
403404
// if the response code is not ok we need to remove the subscription
404405
// and close the connection if necessary.
405406
consumers.Remove(subscriptionId);
406407
await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false);
407-
throw new CreateConsumerException($"Error while creating consumer for stream {config.Stream}, error: {e.Message}");
408+
throw new CreateConsumerException(
409+
$"Error while creating consumer for stream {config.Stream}, error: {e.Message}");
408410
}
409411

410412
return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError));
@@ -724,7 +726,7 @@ private async ValueTask<bool> SendHeartBeat()
724726
private void InternalClose()
725727
{
726728
_heartBeatHandler.Close();
727-
// IsClosed = true;
729+
IsClosed = true;
728730
}
729731

730732
private bool HasEntities()
@@ -747,6 +749,7 @@ public async Task<CloseResponse> Close(string reason)
747749
return new CloseResponse(0, ResponseCode.Ok);
748750
}
749751

752+
InternalClose();
750753
try
751754
{
752755
var result =
@@ -769,8 +772,6 @@ public async Task<CloseResponse> Close(string reason)
769772
connection.Dispose();
770773
}
771774

772-
InternalClose();
773-
774775
return new CloseResponse(0, ResponseCode.Ok);
775776
}
776777

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
6060
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
6161
try
6262
{
63-
await CreateNewEntity(boot).ConfigureAwait(false);
6463
lock (_lock)
6564
{
6665
_isOpen = true;
6766
}
67+
68+
await CreateNewEntity(boot).ConfigureAwait(false);
6869
}
6970

7071
catch (Exception e)

0 commit comments

Comments
 (0)