Skip to content

Commit cf8c927

Browse files
authored
Improve the rawProducer and rawSuperStreamProducer status (#337)
* Improve the rawProducer and rawSuperStreamProducer status * Part of #336 the aim is to standardise the behaviour. * Add AlreadyClosedException when a producer or super stream producer is closed In the same way as AMQP does with channels * Producer: Flush the pending messages when closed. * Producer: Add AlreadyClosedException when closed. So it is the same as: AMQP and RawProducer and RawSuperStreamProducer * Improve IsAKnownException function with the AggregateException that can contain known exceptions needed to reconnect the client * Add EntitiesStateTests to test all the status and verify that all the entities have the same behaviour * Assert the AlreadyClosedException Exception --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d5cdce4 commit cf8c927

12 files changed

+234
-74
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@ public abstract class AbstractEntity : IClosable
3838
protected abstract string GetStream();
3939
protected abstract string DumpEntityConfiguration();
4040

41+
protected void ThrowIfClosed()
42+
{
43+
if (!IsOpen())
44+
{
45+
throw new AlreadyClosedException($"{DumpEntityConfiguration()} is closed.");
46+
}
47+
}
48+
4149
// here the _cancelTokenSource is disposed and the token is cancelled
4250
// in producer is used to cancel the send task
4351
// in consumer is used to cancel the receive task

RabbitMQ.Stream.Client/ClientExceptions.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,42 @@
33
// Copyright (c) 2007-2023 VMware, Inc.
44

55
using System;
6+
using System.Linq;
7+
using System.Net.Sockets;
68

79
namespace RabbitMQ.Stream.Client
810
{
911
internal static class ClientExceptions
1012
{
13+
14+
// <summary>
15+
/// IsAKnownException returns true if the exception is a known exception
16+
/// We need it to reconnect when the producer/consumer.
17+
/// - LeaderNotFoundException is a temporary exception
18+
/// It means that the leader is not available and the client can't reconnect.
19+
/// Especially the Producer that needs to know the leader.
20+
/// - SocketException
21+
/// Client is trying to connect in a not ready endpoint.
22+
/// It is usually a temporary situation.
23+
/// - TimeoutException
24+
/// Network call timed out. It is often a temporary situation and we should retry.
25+
/// In this case we can try to reconnect.
26+
///
27+
/// For the other kind of exception, we just throw back the exception.
28+
//</summary>
29+
internal static bool IsAKnownException(Exception exception)
30+
{
31+
if (exception is AggregateException aggregateException)
32+
{
33+
var x = aggregateException.InnerExceptions.Select(x =>
34+
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
35+
x.GetType() == typeof(LeaderNotFoundException));
36+
return x.Any();
37+
}
38+
39+
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
40+
}
41+
1142
public static void MaybeThrowException(ResponseCode responseCode, string message)
1243
{
1344
if (responseCode is ResponseCode.Ok)
@@ -27,6 +58,14 @@ public static void MaybeThrowException(ResponseCode responseCode, string message
2758
}
2859
}
2960

61+
public class AlreadyClosedException : Exception
62+
{
63+
public AlreadyClosedException(string s)
64+
: base(s)
65+
{
66+
}
67+
}
68+
3069
public class ProtocolException : Exception
3170
{
3271
protected ProtocolException(string s)

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
1414
RabbitMQ.Stream.Client.AbstractEntity.Logger.get -> Microsoft.Extensions.Logging.ILogger
1515
RabbitMQ.Stream.Client.AbstractEntity.Logger.init -> void
1616
RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
17+
RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void
1718
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
19+
RabbitMQ.Stream.Client.AlreadyClosedException
20+
RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void
1821
RabbitMQ.Stream.Client.AuthMechanism
1922
RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism
2023
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
215215
/// <param name="compressionType">No Compression, Gzip Compression. Other types are not provided by default</param>
216216
public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
217217
{
218+
ThrowIfClosed();
218219
if (subEntryMessages.Count != 0)
219220
{
220221
await SemaphoreAwaitAsync().ConfigureAwait(false);
@@ -239,6 +240,7 @@ private async Task SemaphoreAwaitAsync()
239240
/// <param name="messages"></param>
240241
public async ValueTask Send(List<(ulong, Message)> messages)
241242
{
243+
ThrowIfClosed();
242244
PreValidateBatch(messages);
243245
await InternalBatchSend(messages).ConfigureAwait(false);
244246
}
@@ -275,6 +277,7 @@ internal void PreValidateBatch(List<(ulong, Message)> messages)
275277

276278
private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessagesList = true)
277279
{
280+
ThrowIfClosed();
278281
if (IsFilteringEnabled)
279282
{
280283
await _client.Publish(new PublishFilter(EntityId, messages, _config.Filter.FilterValue,
@@ -322,6 +325,7 @@ public async Task<ulong> GetLastPublishingId()
322325
[MethodImpl(MethodImplOptions.AggressiveInlining)]
323326
public async ValueTask Send(ulong publishingId, Message message)
324327
{
328+
ThrowIfClosed();
325329
if (message.Size > _client.MaxFrameSize)
326330
{
327331
throw new InvalidOperationException($"Message size is to big. " +

RabbitMQ.Stream.Client/RawSuperStreamProducer.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,14 @@ private async Task<IProducer> InitProducer(string stream)
136136
return p;
137137
}
138138

139+
protected void ThrowIfClosed()
140+
{
141+
if (!IsOpen())
142+
{
143+
throw new AlreadyClosedException($"Super stream {_config.SuperStream} is closed.");
144+
}
145+
}
146+
139147
private async Task<IProducer> GetProducer(string stream)
140148
{
141149
if (!_producers.ContainsKey(stream))
@@ -170,12 +178,14 @@ private async Task<IProducer> GetProducerForMessage(Message message)
170178

171179
public async ValueTask Send(ulong publishingId, Message message)
172180
{
181+
ThrowIfClosed();
173182
var producer = await GetProducerForMessage(message).ConfigureAwait(false);
174183
await producer.Send(publishingId, message).ConfigureAwait(false);
175184
}
176185

177186
public async ValueTask Send(List<(ulong, Message)> messages)
178187
{
188+
ThrowIfClosed();
179189
var aggregate = new List<(IProducer, List<(ulong, Message)>)>();
180190

181191
// this part is not super-optimized
@@ -203,6 +213,7 @@ public async ValueTask Send(List<(ulong, Message)> messages)
203213

204214
public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
205215
{
216+
ThrowIfClosed();
206217
var aggregate = new List<(IProducer, List<Message>)>();
207218

208219
// this part is not super-optimized

RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ internal void Start()
113113

114114
internal void Stop()
115115
{
116+
FlushPendingMessages();
116117
_invalidateTimer.Enabled = false;
117118
_waitForConfirmationActionBlock.Complete();
118119
}
@@ -129,6 +130,15 @@ await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value
129130
}
130131
}
131132

133+
private async void FlushPendingMessages()
134+
{
135+
foreach (var pair in _waitForConfirmation)
136+
{
137+
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null)
138+
.ConfigureAwait(false);
139+
}
140+
}
141+
132142
internal void AddUnConfirmedMessage(ulong publishingId, Message message)
133143
{
134144
AddUnConfirmedMessage(publishingId, new List<Message> { message });

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,14 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
142142
_logger = logger ?? NullLogger<Producer>.Instance;
143143
}
144144

145+
private void ThrowIfClosed()
146+
{
147+
if (!_isOpen)
148+
{
149+
throw new AlreadyClosedException("Producer is closed");
150+
}
151+
}
152+
145153
/// <summary>
146154
/// Create a new Producer.
147155
/// <param name="producerConfig">Producer Configuration. Where StreamSystem and Stream are mandatory.</param>
@@ -241,6 +249,7 @@ internal async Task<ulong> GetLastPublishingId()
241249

242250
internal async ValueTask SendInternal(ulong publishingId, Message message)
243251
{
252+
ThrowIfClosed();
244253
_confirmationPipe.AddUnConfirmedMessage(publishingId, message);
245254
try
246255
{
@@ -249,9 +258,16 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
249258
// In this case it skips the publish until
250259
// the producer is connected. Messages are safe since are stored
251260
// on the _waitForConfirmation list. The user will get Timeout Error
252-
if (!(_inReconnection))
261+
if (!_inReconnection)
253262
{
254-
await _producer.Send(publishingId, message).ConfigureAwait(false);
263+
if (_producer.IsOpen())
264+
{
265+
await _producer.Send(publishingId, message).ConfigureAwait(false);
266+
}
267+
else
268+
{
269+
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
270+
}
255271
}
256272
}
257273

@@ -284,14 +300,22 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
284300
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
285301
public async ValueTask Send(List<Message> messages, CompressionType compressionType)
286302
{
303+
ThrowIfClosed();
287304
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
288305
Interlocked.Increment(ref _publishingId);
289306
_confirmationPipe.AddUnConfirmedMessage(_publishingId, messages);
290307
try
291308
{
292309
if (!_inReconnection)
293310
{
294-
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
311+
if (_producer.IsOpen())
312+
{
313+
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
314+
}
315+
else
316+
{
317+
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
318+
}
295319
}
296320
}
297321

@@ -330,6 +354,7 @@ public override string ToString()
330354
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
331355
public async ValueTask Send(List<Message> messages)
332356
{
357+
ThrowIfClosed();
333358
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
334359
var messagesToSend = new List<(ulong, Message)>();
335360
foreach (var message in messages)
@@ -352,7 +377,14 @@ public async ValueTask Send(List<Message> messages)
352377
// on the _waitForConfirmation list. The user will get Timeout Error
353378
if (!(_inReconnection))
354379
{
355-
await _producer.Send(messagesToSend).ConfigureAwait(false);
380+
if (_producer.IsOpen())
381+
{
382+
await _producer.Send(messagesToSend).ConfigureAwait(false);
383+
}
384+
else
385+
{
386+
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
387+
}
356388
}
357389
}
358390

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
// Copyright (c) 2007-2023 VMware, Inc.
44

55
using System;
6-
using System.Net.Sockets;
6+
77
using System.Threading;
88
using System.Threading.Tasks;
99
using Microsoft.Extensions.Logging;
@@ -67,7 +67,7 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
6767

6868
catch (Exception e)
6969
{
70-
reconnect = IsAKnownException(e);
70+
reconnect = ClientExceptions.IsAKnownException(e);
7171
LogException(e);
7272
if (!reconnect)
7373
{
@@ -143,7 +143,8 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
143143
await Task.Delay(500).ConfigureAwait(false);
144144
if (await system.StreamExists(stream).ConfigureAwait(false))
145145
{
146-
BaseLogger.LogInformation("Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
146+
BaseLogger.LogInformation(
147+
"Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
147148
stream,
148149
ToString()
149150
);
@@ -164,31 +165,11 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
164165
}
165166
}
166167

167-
// <summary>
168-
/// IsAKnownException returns true if the exception is a known exception
169-
/// We need it to reconnect when the producer/consumer.
170-
/// - LeaderNotFoundException is a temporary exception
171-
/// It means that the leader is not available and the client can't reconnect.
172-
/// Especially the Producer that needs to know the leader.
173-
/// - SocketException
174-
/// Client is trying to connect in a not ready endpoint.
175-
/// It is usually a temporary situation.
176-
/// - TimeoutException
177-
/// Some call went in timeout. Maybe a temporary DNS problem.
178-
/// In this case we can try to reconnect.
179-
///
180-
/// For the other kind of exception, we just throw back the exception.
181-
//</summary>
182-
internal static bool IsAKnownException(Exception exception)
183-
{
184-
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
185-
}
186-
187168
private void LogException(Exception exception)
188169
{
189170
const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception";
190171
const string UnknownExceptionTemplate = "{Identity} received an exception during initialization";
191-
if (IsAKnownException(exception))
172+
if (ClientExceptions.IsAKnownException(exception))
192173
{
193174
BaseLogger.LogError(exception, KnownExceptionTemplate, ToString());
194175
}

0 commit comments

Comments
 (0)