Skip to content

Enable rabbitmq-client event logging when tests are verbose #1559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ await ReceiveLoopAsync(mainLoopToken)
{
// Possible heartbeat exception
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
0, "End of stream",
0,
"End of stream",
exception: eose);
HandleMainLoopException(ea);
}
Expand All @@ -73,15 +74,31 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
* Ensure that these exceptions eventually make it to application code
*/
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError, fileLoadException.Message,
Constants.InternalError,
fileLoadException.Message,
exception: fileLoadException);
HandleMainLoopException(ea);
}
catch (OperationCanceledException ocex)
{
if (mainLoopToken.IsCancellationRequested)
{
// ignore
}
else
{
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
ocex.Message,
exception: ocex);
HandleMainLoopException(ea);
}
}
catch (Exception ex)
{
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
$"Unexpected Exception: {ex.Message}",
ex.Message,
exception: ex);
HandleMainLoopException(ea);
}
Expand All @@ -91,24 +108,24 @@ await FinishCloseAsync(cts.Token)
.ConfigureAwait(false);
}

private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken)
private async Task ReceiveLoopAsync(CancellationToken mainLoopCancellationToken)
{
while (false == _closed)
{
mainLoopCancelllationToken.ThrowIfCancellationRequested();
mainLoopCancellationToken.ThrowIfCancellationRequested();

while (_frameHandler.TryReadFrame(out InboundFrame frame))
{
NotifyHeartbeatListener();
await ProcessFrameAsync(frame, mainLoopCancelllationToken)
await ProcessFrameAsync(frame, mainLoopCancellationToken)
.ConfigureAwait(false);
}

// Done reading frames synchronously, go async
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancelllationToken)
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync(mainLoopCancellationToken)
.ConfigureAwait(false);
NotifyHeartbeatListener();
await ProcessFrameAsync(asyncFrame, mainLoopCancelllationToken)
await ProcessFrameAsync(asyncFrame, mainLoopCancellationToken)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -180,17 +197,18 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
string message = reason.GetLogMessage();
if (false == SetCloseReason(reason))
{
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
LogCloseError($"unexpected main loop exception while closing: {message}", reason.Exception);
return;
}

_channel0.MaybeSetConnectionStartException(reason.Exception);

OnShutdown(reason);
LogCloseError($"Unexpected connection closure: {message}", reason.Exception);
LogCloseError($"unexpected connection closure: {message}", reason.Exception);
}

private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, CancellationToken cancellationToken)
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
CancellationToken mainLoopCancellationToken)
{
if (SetCloseReason(hpe.ShutdownReason))
{
Expand All @@ -200,11 +218,11 @@ await _session0.SetSessionClosingAsync(false)
try
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
await _session0.TransmitAsync(in cmd, cancellationToken)
await _session0.TransmitAsync(in cmd, mainLoopCancellationToken)
.ConfigureAwait(false);
if (hpe.CanShutdownCleanly)
{
await ClosingLoopAsync(cancellationToken)
await ClosingLoopAsync(mainLoopCancellationToken)
.ConfigureAwait(false);
}
}
Expand All @@ -222,13 +240,13 @@ await ClosingLoopAsync(cancellationToken)
///<remarks>
/// Loop only used while quiescing. Use only to cleanly close connection
///</remarks>
private async Task ClosingLoopAsync(CancellationToken cancellationToken)
private async Task ClosingLoopAsync(CancellationToken mainLoopCancellationToken)
{
try
{
_frameHandler.ReadTimeout = default;
// Wait for response/socket closure or timeout
await ReceiveLoopAsync(cancellationToken)
await ReceiveLoopAsync(mainLoopCancellationToken)
.ConfigureAwait(false);
}
catch (ObjectDisposedException ode)
Expand Down
7 changes: 4 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ private static void ProcessProtocolHeader(ReadOnlySequence<byte> buffer)
}
}

internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reader, uint maxMessageSize, CancellationToken cancellationToken)
internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reader, uint maxMessageSize,
CancellationToken mainLoopCancellationToken)
{
ReadResult result = await reader.ReadAsync(cancellationToken)
ReadResult result = await reader.ReadAsync(mainLoopCancellationToken)
.ConfigureAwait(false);

ReadOnlySequence<byte> buffer = result.Buffer;
Expand All @@ -270,7 +271,7 @@ internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reade
reader.AdvanceTo(buffer.Start, buffer.End);

// Not enough data, read a bit more
result = await reader.ReadAsync(cancellationToken)
result = await reader.ReadAsync(mainLoopCancellationToken)
.ConfigureAwait(false);

MaybeThrowEndOfStream(result, buffer);
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ await _pipeReader.CompleteAsync()
}
}

public ValueTask<InboundFrame> ReadFrameAsync(CancellationToken cancellationToken)
public ValueTask<InboundFrame> ReadFrameAsync(CancellationToken mainLoopCancellationToken)
{
return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, cancellationToken);
return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, mainLoopCancellationToken);
}

public bool TryReadFrame(out InboundFrame frame)
Expand Down
12 changes: 7 additions & 5 deletions projects/RabbitMQ.Client/client/logging/ESLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;

namespace RabbitMQ.Client.Logging
{
internal static class ESLog
{
public static void Info(string message)
{
Logging.RabbitMqClientEventSource.Log.Info(message);
RabbitMqClientEventSource.Log.Info(message);
}

public static void Info(string message, params object[] args)
Expand All @@ -46,7 +48,7 @@ public static void Info(string message, params object[] args)

public static void Warn(string message)
{
Logging.RabbitMqClientEventSource.Log.Warn(message);
RabbitMqClientEventSource.Log.Warn(message);
}

public static void Warn(string message, params object[] args)
Expand All @@ -55,12 +57,12 @@ public static void Warn(string message, params object[] args)
Warn(msg);
}

public static void Error(string message, System.Exception ex)
public static void Error(string message, Exception ex)
{
Logging.RabbitMqClientEventSource.Log.Error(message, ex);
RabbitMqClientEventSource.Log.Error(message, ex);
}

public static void Error(string message, System.Exception ex, params object[] args)
public static void Error(string message, Exception ex, params object[] args)
{
string msg = string.Format(message, args);
Error(msg, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,93 +29,63 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Diagnostics.Tracing;
using System.Threading;

namespace RabbitMQ.Client.Logging
{
#nullable enable
internal sealed partial class RabbitMqClientEventSource
{
private static int ConnectionsOpened;
private static int ConnectionsClosed;
private static int ChannelsOpened;
private static int ChannelsClosed;
private static long BytesSent;
private static long BytesReceived;
private static long CommandsSent;
private static long CommandsReceived;
private static int s_connectionsOpened;
private static int s_connectionsClosed;
private static int s_channelsOpened;
private static int s_channelsClosed;
private static long s_bytesSent;
private static long s_bytesReceived;
private static long s_commandsSent;
private static long s_commandsReceived;

#if NET6_0_OR_GREATER
private PollingCounter? _connectionOpenedCounter;
private PollingCounter? _openConnectionCounter;
private PollingCounter? _channelOpenedCounter;
private PollingCounter? _openChannelCounter;
private IncrementingPollingCounter? _bytesSentCounter;
private IncrementingPollingCounter? _bytesReceivedCounter;
private IncrementingPollingCounter? _commandSentCounter;
private IncrementingPollingCounter? _commandReceivedCounter;

protected override void OnEventCommand(EventCommandEventArgs command)
{
if (command.Command == EventCommand.Enable)
{
_connectionOpenedCounter ??= new PollingCounter("total-connections-opened", this, () => ConnectionsOpened) { DisplayName = "Total connections opened" };
_openConnectionCounter ??= new PollingCounter("current-open-connections", this, () => ConnectionsOpened - ConnectionsClosed) { DisplayName = "Current open connections count" };

_channelOpenedCounter ??= new PollingCounter("total-channels-opened", this, () => ChannelsOpened) { DisplayName = "Total channels opened" };
_openChannelCounter ??= new PollingCounter("current-open-channels", this, () => ChannelsOpened - ChannelsClosed) { DisplayName = "Current open channels count" };

_bytesSentCounter ??= new IncrementingPollingCounter("bytes-sent-rate", this, () => Interlocked.Read(ref BytesSent)) { DisplayName = "Byte sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
_bytesReceivedCounter ??= new IncrementingPollingCounter("bytes-received-rate", this, () => Interlocked.Read(ref BytesReceived)) { DisplayName = "Byte receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };

_commandSentCounter ??= new IncrementingPollingCounter("AMQP-method-sent-rate", this, () => Interlocked.Read(ref CommandsSent)) { DisplayName = "AMQP method sending rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
_commandReceivedCounter ??= new IncrementingPollingCounter("AMQP-method-received-rate", this, () => Interlocked.Read(ref CommandsReceived)) { DisplayName = "AMQP method receiving rate", DisplayUnits = "B", DisplayRateTimeScale = new TimeSpan(0, 0, 1) };
}
}
#endif
[NonEvent]
public void ConnectionOpened()
{
Interlocked.Increment(ref ConnectionsOpened);
Interlocked.Increment(ref s_connectionsOpened);
}

[NonEvent]
public void ConnectionClosed()
{
Interlocked.Increment(ref ConnectionsClosed);
Interlocked.Increment(ref s_connectionsClosed);
}

[NonEvent]
public void ChannelOpened()
{
Interlocked.Increment(ref ChannelsOpened);
Interlocked.Increment(ref s_channelsOpened);
}

[NonEvent]
public void ChannelClosed()
{
Interlocked.Increment(ref ChannelsClosed);
Interlocked.Increment(ref s_channelsClosed);
}

[NonEvent]
public void DataReceived(int byteCount)
{
Interlocked.Add(ref BytesReceived, byteCount);
Interlocked.Add(ref s_bytesReceived, byteCount);
}

[NonEvent]
public void CommandSent(int byteCount)
{
Interlocked.Increment(ref CommandsSent);
Interlocked.Add(ref BytesSent, byteCount);
Interlocked.Increment(ref s_commandsSent);
Interlocked.Add(ref s_bytesSent, byteCount);
}

[NonEvent]
public void CommandReceived()
{
Interlocked.Increment(ref CommandsReceived);
Interlocked.Increment(ref s_commandsReceived);
}
}
}
Loading