Skip to content

Commit 2125dcd

Browse files
committed
Uniform the error or debug messages
in the rawconsumer. It gives all the information about the consumer. It is helpful in case of debugging Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 4be5169 commit 2125dcd

File tree

3 files changed

+69
-41
lines changed

3 files changed

+69
-41
lines changed

RabbitMQ.Stream.Client/Deliver.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ internal static int Read(ReadOnlySequence<byte> frame, out Chunk chunk)
175175
$"Chunk: Not enough data, sourceLength: {reader.Length}, memoryLen: {memory.Length}, dataLen: {dataLen}");
176176
}
177177

178+
if (memory.Length != dataLen)
179+
{
180+
throw new Exception($"Chunk: Not enough data, memoryLen: {memory.Length}, dataLen: {dataLen}");
181+
}
182+
178183
chunk = new Chunk(magicVersion, numEntries, numRecords, timestamp, epoch, chunkId, crc, memory);
179184
return offset;
180185
}

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,24 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
122122
private readonly Channel<Chunk> _chunksBuffer;
123123
private readonly ushort _initialCredits;
124124

125+
private string ConsumerInfo()
126+
{
127+
var superStream = string.IsNullOrEmpty(_config.SuperStream)
128+
? "No SuperStream"
129+
: $"SuperStream {_config.SuperStream}";
130+
return
131+
$"Consumer for stream: {_config.Stream}, reference: {_config.Reference}, OffsetSpec {_config.OffsetSpec} " +
132+
$"Client ProvidedName {_config.ClientProvidedName}, " +
133+
$"{superStream}, IsSingleActiveConsumer: {_config.IsSingleActiveConsumer}, " +
134+
$"Token IsCancellationRequested: {Token.IsCancellationRequested} ";
135+
}
136+
125137
private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = null)
126138
{
127139
_logger = logger ?? NullLogger.Instance;
128140
_initialCredits = config.InitialCredits;
129-
_logger.LogDebug("creating consumer {Consumer} with initial credits {InitialCredits}, " +
130-
"offset {OffsetSpec}, is single active consumer {IsSingleActiveConsumer}, super stream {SuperStream}, client provided name {ClientProvidedName}",
131-
config.Reference, _initialCredits, config.OffsetSpec, config.IsSingleActiveConsumer, config.SuperStream,
132-
config.ClientProvidedName);
141+
_config = config;
142+
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
133143

134144
// _chunksBuffer is a channel that is used to buffer the chunks
135145
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
@@ -141,7 +151,6 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
141151
});
142152
IsPromotedAsActive = true;
143153
_client = client;
144-
_config = config;
145154

146155
ProcessChunks();
147156
}
@@ -215,10 +224,18 @@ Message MessageFromSequence(ref ReadOnlySequence<byte> unCompressedData, ref int
215224
}
216225
catch (Exception e)
217226
{
218-
_logger.LogError(e,
219-
"Error while parsing message on the stream {Stream}, reference {Reference} The message will be skipped. " +
227+
if (Token.IsCancellationRequested)
228+
{
229+
_logger?.LogDebug(
230+
"Error while parsing message {ConsumerInfo}, Cancellation Requested, the consumer is closing. ",
231+
ConsumerInfo());
232+
return null;
233+
}
234+
235+
_logger?.LogError(e,
236+
"Error while parsing message {ConsumerInfo}. The message will be skipped. " +
220237
"Please report this issue to the RabbitMQ team on GitHub {Repo}",
221-
_config.Stream, _config.Reference, Consts.RabbitMQClientRepo);
238+
ConsumerInfo(), Consts.RabbitMQClientRepo);
222239
}
223240

224241
return null;
@@ -254,8 +271,9 @@ async Task DispatchMessage(Message message, ulong i)
254271
{
255272
_logger.LogError(e,
256273
"Error while filtering message. Message with offset {MessageOffset} won't be dispatched."
257-
+ "Suggestion: review the PostFilter value function",
258-
message.MessageOffset);
274+
+ "Suggestion: review the PostFilter value function"
275+
+ "{ConsumerInfo}",
276+
message.MessageOffset, ConsumerInfo());
259277
canDispatch = false;
260278
}
261279
}
@@ -271,8 +289,8 @@ await _config.MessageHandler(this,
271289
else
272290
{
273291
_logger?.LogDebug(
274-
"The consumer is not active for the stream {Stream}. message won't dispatched",
275-
_config.Stream);
292+
"{ConsumerInfo} is not active. message won't dispatched",
293+
ConsumerInfo());
276294
}
277295
}
278296
}
@@ -281,14 +299,14 @@ await _config.MessageHandler(this,
281299
catch (OperationCanceledException)
282300
{
283301
_logger?.LogWarning(
284-
"OperationCanceledException. The consumer id: {SubscriberId}, Stream:{Stream}, reference: {Reference} has been closed while consuming messages",
285-
_subscriberId, _config.Stream, _config.Reference);
302+
"OperationCanceledException. {ConsumerInfo} has been closed while consuming messages",
303+
ConsumerInfo());
286304
}
287305
catch (Exception e)
288306
{
289307
_logger?.LogError(e,
290-
"Error while processing chunk: {ChunkId} Stream:{Stream}, reference: {Reference}, Token IsCancellationRequested: {Token}",
291-
chunk.ChunkId, _config.Stream, _config.Reference, Token.IsCancellationRequested);
308+
"Error while Dispatching message, ChunkId : {ChunkId} {ConsumerInfo}",
309+
chunk.ChunkId, ConsumerInfo());
292310
}
293311
}
294312

@@ -345,8 +363,8 @@ await _config.MessageHandler(this,
345363
catch (Exception e)
346364
{
347365
_logger?.LogError(e,
348-
"Error while processing chunk: {ChunkId} Stream:{Stream}, reference: {Reference}, Token IsCancellationRequested: {Token}",
349-
chunk.ChunkId, _config.Stream, _config.Reference, Token.IsCancellationRequested);
366+
"Error while parsing chunk, ChunkId : {ChunkId} {ConsumerInfo}",
367+
chunk.ChunkId, ConsumerInfo());
350368
}
351369
}
352370

@@ -377,7 +395,9 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
377395
// The client will throw an InvalidOperationException
378396
// since the connection is closed
379397
// In this case we don't want to log the error to avoid log noise
380-
_logger?.LogDebug("Can't send the credit: The TCP client has been closed");
398+
_logger?.LogDebug(
399+
"Can't send the credit {ConsumerInfo}: The TCP client has been closed",
400+
ConsumerInfo());
381401
break;
382402
}
383403

@@ -394,8 +414,8 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
394414
}
395415

396416
_logger?.LogDebug(
397-
"The ProcessChunks for the stream: {Stream}, reference: {Reference} task has been closed normally",
398-
_config.Stream, _config.Reference);
417+
"The ProcessChunks {ConsumerInfo} task has been closed normally",
418+
ConsumerInfo());
399419
}
400420
catch (Exception e)
401421
{
@@ -405,14 +425,14 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
405425
if (Token.IsCancellationRequested)
406426
{
407427
_logger?.LogDebug(
408-
"The ProcessChunks task for the stream: {Stream}, reference: {Reference} has been closed due to cancellation",
409-
_config.Stream, _config.Reference);
428+
"The ProcessChunks task for the stream: {ConsumerInfo} has been closed due to cancellation",
429+
ConsumerInfo());
410430
return;
411431
}
412432

413433
_logger?.LogError(e,
414-
"Error while process chunks the stream: {Stream}, reference: {Reference}. The ProcessChunks task will be closed",
415-
_config.Stream, _config.Reference);
434+
"Error while process chunks the stream: {ConsumerInfo} The ProcessChunks task will be closed",
435+
ConsumerInfo());
416436
}
417437
}, Token);
418438
}
@@ -463,6 +483,7 @@ private async Task Init()
463483
}
464484
}
465485

486+
var chunkConsumed = 0;
466487
// this the default value for the consumer.
467488
_config.StoredOffsetSpec = _config.OffsetSpec;
468489
var (consumerId, response) = await _client.Subscribe(
@@ -471,6 +492,7 @@ private async Task Init()
471492
consumerProperties,
472493
async deliver =>
473494
{
495+
chunkConsumed++;
474496
// Send the chunk to the _chunksBuffer
475497
// in this way the chunks are processed in a separate thread
476498
// this wont' block the socket thread
@@ -480,9 +502,9 @@ private async Task Init()
480502
// the consumer is closing from the user but some chunks are still in the buffer
481503
// simply skip the chunk
482504
_logger?.LogTrace(
483-
"CancellationToken requested. The consumer is closing from the stream {Stream}, reference: {Reference}. " +
505+
"CancellationToken requested. The {ConsumerInfo} " +
484506
"The chunk won't be processed",
485-
_config.Stream, _config.Reference);
507+
ConsumerInfo());
486508
return;
487509
}
488510

@@ -494,12 +516,13 @@ private async Task Init()
494516
if (crcCalculated != deliver.Chunk.Crc)
495517
{
496518
_logger?.LogError(
497-
"CRC32 does not match, server crc: {ChunkCrc}, local crc: {CrcCalculated}, stream: {Stream}, token IsCancellationRequested: {Token}",
498-
deliver.Chunk.Crc, crcCalculated, _config.Stream, Token.IsCancellationRequested);
519+
"CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {ConsumerInfo}, " +
520+
"Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, ConsumerInfo(),
521+
chunkConsumed);
499522

500523
throw new CrcException(
501-
$"CRC32 does not match, server crc {deliver.Chunk.Crc}, local crc {crcCalculated}, " +
502-
$"stream {_config.Stream}");
524+
$"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {ConsumerInfo()}, " +
525+
$"Chunk Consumed {chunkConsumed}");
503526
}
504527
}
505528

@@ -520,9 +543,9 @@ private async Task Init()
520543
// important for the dispatcher messages
521544
IsPromotedAsActive = promotedAsActive;
522545
_logger?.LogDebug(
523-
"The consumer active status is: {IsActive} for the stream: {Stream}, reference: {Reference}",
546+
"The consumer active status is: {IsActive} for {ConsumeInfo}",
524547
IsPromotedAsActive,
525-
_config.Stream, _config.Reference);
548+
ConsumerInfo());
526549
return _config.StoredOffsetSpec;
527550
}
528551
).ConfigureAwait(false);
@@ -557,21 +580,21 @@ public async Task<ResponseCode> Close()
557580
catch (TimeoutException)
558581
{
559582
_logger.LogError(
560-
"Timeout removing the consumer id: {SubscriberId}, stream {Stream}, reference {Reference} from the server. " +
583+
"Timeout removing the consumer id: {SubscriberId}, {ConsumerInfo} from the server. " +
561584
"The consumer will be closed anyway",
562-
_subscriberId, _config.Stream, _config.Reference);
585+
_subscriberId, ConsumerInfo());
563586
}
564587

565588
catch (Exception e)
566589
{
567590
_logger.LogError(e,
568-
"Error removing the consumer id: {SubscriberId}, stream {Stream}, reference {Reference} from the server",
569-
_subscriberId, _config.Stream, _config.Reference);
591+
"Error removing the consumer id: {SubscriberId}, {ConsumerInfo} from the server",
592+
_subscriberId, ConsumerInfo());
570593
}
571594

572595
var closed = await _client.MaybeClose($"_client-close-subscriber: {_subscriberId}").ConfigureAwait(false);
573596
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-subscriber: {_subscriberId}");
574-
_logger.LogDebug("Consumer {SubscriberId} closed", _subscriberId);
597+
_logger.LogDebug("{ConsumerInfo} is closed", ConsumerInfo());
575598

576599
return result;
577600
}
@@ -597,7 +620,7 @@ private void Dispose(bool disposing)
597620
}
598621

599622
ClientExceptions.MaybeThrowException(closeConsumer.Result,
600-
$"Error during remove producer. Subscriber: {_subscriberId}");
623+
$"Error during remove producer. {ConsumerInfo()}");
601624
}
602625
finally
603626
{
@@ -613,7 +636,7 @@ public void Dispose()
613636
}
614637
catch (Exception e)
615638
{
616-
_logger.LogError(e, "Error during disposing of consumer: {SubscriberId}", _subscriberId);
639+
_logger.LogError(e, "Error during disposing of {ConsumerInfo}", ConsumerInfo());
617640
}
618641
finally
619642
{

kubernetes/stream_cluster.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ metadata:
1010
namespace: stream-clients-test
1111
spec:
1212
replicas: 3
13-
image: rabbitmq:3.12.3-management
13+
image: rabbitmq:3.12.6-management
1414
service:
1515
type: LoadBalancer
1616
tls:

0 commit comments

Comments
 (0)