Skip to content

Separate out connection recovery tests #1549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ else
readonly run_toxiproxy='false'
fi

if [[ $2 == 'pull' ]]
then
readonly docker_pull_args='--pull always'
else
readonly docker_pull_args=''
fi

set -o nounset

declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
Expand All @@ -43,7 +50,8 @@ function start_toxiproxy
# sudo ss -4nlp
echo "[INFO] starting Toxiproxy server docker container"
docker rm --force "$toxiproxy_docker_name" 2>/dev/null || echo "[INFO] $toxiproxy_docker_name was not running"
docker run --detach \
# shellcheck disable=SC2086
docker run --detach $docker_pull_args \
--name "$toxiproxy_docker_name" \
--hostname "$toxiproxy_docker_name" \
--publish 8474:8474 \
Expand All @@ -58,7 +66,8 @@ function start_rabbitmq
echo "[INFO] starting RabbitMQ server docker container"
chmod 0777 "$GITHUB_WORKSPACE/.ci/ubuntu/log"
docker rm --force "$rabbitmq_docker_name" 2>/dev/null || echo "[INFO] $rabbitmq_docker_name was not running"
docker run --detach \
# shellcheck disable=SC2086
docker run --detach $docker_pull_args \
--name "$rabbitmq_docker_name" \
--hostname "$rabbitmq_docker_name" \
--publish 5671:5671 \
Expand Down Expand Up @@ -101,7 +110,8 @@ function wait_rabbitmq

function get_rabbitmq_id
{
local rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
local rabbitmq_docker_id
rabbitmq_docker_id="$(docker inspect --format='{{.Id}}' "$rabbitmq_docker_name")"
echo "[INFO] '$rabbitmq_docker_name' docker id is '$rabbitmq_docker_id'"
if [[ -v GITHUB_OUTPUT ]]
then
Expand Down
3 changes: 1 addition & 2 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ if ($RunTests)
dotnet test $csproj_file --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' --no-restore --no-build --logger "console;verbosity=detailed"
if ($LASTEXITCODE -ne 0)
{
Write-Host "[ERROR] tests errored, exiting" -Foreground "Red"
Exit 1
Write-Host "[WARNING] tests errored, exiting" -Foreground "Red"
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~RabbitMQ.Client.IChannel.QueueDeclarePassiveAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.QueueDeclareOk>
~RabbitMQ.Client.IChannel.QueueDeleteAsync(string queue, bool ifUnused, bool ifEmpty, bool noWait = false, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
~RabbitMQ.Client.IChannel.QueuePurgeAsync(string queue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<uint>
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.QueueUnbindAsync(string queue, string exchange, string routingKey, System.Collections.Generic.IDictionary<string, object> arguments = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxCommitAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxRollbackAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
~RabbitMQ.Client.IChannel.TxSelectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ Task QueueBindAsync(string queue, string exchange, string routingKey,
/// Routing key must be shorter than 255 bytes.
/// </remarks>
Task QueueUnbindAsync(string queue, string exchange, string routingKey,
IDictionary<string, object> arguments,
IDictionary<string, object> arguments = null,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/client/impl/RecordedBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ public Task RecoverAsync(IChannel channel)

public bool Equals(RecordedBinding other)
{
return _isQueueBinding == other._isQueueBinding && _destination == other._destination && _source == other._source &&
_routingKey == other._routingKey && _arguments == other._arguments;
return _isQueueBinding == other._isQueueBinding &&
_destination == other._destination &&
_source == other._source &&
_routingKey == other._routingKey &&
_arguments == other._arguments;
}

public override bool Equals(object? obj)
Expand Down
130 changes: 111 additions & 19 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public abstract class IntegrationFixture : IAsyncLifetime
private static readonly bool s_isVerbose = false;
private static int _connectionIdx = 0;

private Exception _connectionCallbackException;
private Exception _connectionRecoveryException;
private Exception _channelCallbackException;

protected readonly RabbitMQCtl _rabbitMQCtl;

protected ConnectionFactory _connFactory;
Expand All @@ -77,7 +81,12 @@ public abstract class IntegrationFixture : IAsyncLifetime

static IntegrationFixture()
{

#if NET6_0_OR_GREATER
S_Random = Random.Shared;
#else
S_Random = new Random();
#endif
s_isRunningInCI = InitIsRunningInCI();
s_isVerbose = InitIsVerbose();

Expand Down Expand Up @@ -146,8 +155,10 @@ public virtual async Task InitializeAsync()

if (IsVerbose)
{
AddCallbackHandlers();
AddCallbackShutdownHandlers();
}

AddCallbackExceptionHandlers();
}

if (_connFactory.AutomaticRecoveryEnabled)
Expand Down Expand Up @@ -181,42 +192,122 @@ public virtual async Task DisposeAsync()
_channel = null;
_conn = null;
}

DisposeAssertions();
}

protected virtual void DisposeAssertions()
{
if (_connectionRecoveryException != null)
{
Assert.Fail($"unexpected connection recovery exception: {_connectionRecoveryException}");
}

if (_connectionCallbackException != null)
{
Assert.Fail($"unexpected connection callback exception: {_connectionCallbackException}");
}

if (_channelCallbackException != null)
{
Assert.Fail($"unexpected channel callback exception: {_channelCallbackException}");
}
}

protected virtual void AddCallbackHandlers()
protected void AddCallbackExceptionHandlers()
{
if (_conn != null)
{
_conn.CallbackException += (o, ea) =>
_conn.ConnectionRecoveryError += (s, ea) =>
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, ea.Exception);
_connectionRecoveryException = ea.Exception;

if (IsVerbose)
{
try
{
_output.WriteLine($"{0} connection recovery exception: {1}",
_testDisplayName, _connectionRecoveryException);
}
catch (InvalidOperationException)
{
}
}
};

_conn.ConnectionShutdown += (o, ea) =>
_conn.CallbackException += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
_connectionCallbackException = ea.Exception;

if (IsVerbose)
{
_output.WriteLine("{0} connection shutdown, args: {1}",
_testDisplayName, args);
});
try
{
_output.WriteLine("{0} connection callback exception: {1}",
_testDisplayName, _connectionCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}

if (_channel != null)
{
_channel.CallbackException += (o, ea) =>
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, ea.Exception);
_channelCallbackException = ea.Exception;

if (IsVerbose)
{
try
{
_output.WriteLine("{0} channel callback exception: {1}",
_testDisplayName, _channelCallbackException);
}
catch (InvalidOperationException)
{
}
}
};
}
}

protected void AddCallbackShutdownHandlers()
{
if (_conn != null)
{
_conn.ConnectionShutdown += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
{
try
{
_output.WriteLine("{0} connection shutdown, args: {1}",
_testDisplayName, args);
}
catch (InvalidOperationException)
{
}
});
};
}

if (_channel != null)
{
_channel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(_channel, ea, (args) =>
{
_output.WriteLine("{0} channel shutdown, args: {1}",
_testDisplayName, args);
try
{
_output.WriteLine("{0} channel shutdown, args: {1}",
_testDisplayName, args);
}
catch (InvalidOperationException)
{
}
});
};
}
Expand Down Expand Up @@ -405,6 +496,11 @@ protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
return DoAssertRanToCompletion(tasks);
}

internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
{
Assert.Equal(n, c.RecordedQueuesCount);
}

protected static Task WaitAsync(TaskCompletionSource<bool> tcs, string desc)
{
return WaitAsync(tcs, WaitSpan, desc);
Expand Down Expand Up @@ -524,11 +620,7 @@ protected static string GetUniqueString(ushort length)
protected static byte[] GetRandomBody(ushort size = 1024)
{
var body = new byte[size];
#if NET6_0_OR_GREATER
Random.Shared.NextBytes(body);
#else
S_Random.NextBytes(body);
#endif
return body;
}

Expand All @@ -543,7 +635,7 @@ protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);
aconn.RecoverySucceeded += (source, ea) => tcs.TrySetResult(true);

return tcs;
}
Expand Down
10 changes: 3 additions & 7 deletions projects/Test/Common/TestConnectionRecoveryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class TestConnectionRecoveryBase : IntegrationFixture
protected const ushort TotalMessageCount = 16384;
protected const ushort CloseAtCount = 16;

public TestConnectionRecoveryBase(ITestOutputHelper output) : base(output)
public TestConnectionRecoveryBase(ITestOutputHelper output, bool dispatchConsumersAsync = false)
: base(output, dispatchConsumersAsync: dispatchConsumersAsync)
{
_messageBody = GetRandomBody(4096);
}
Expand Down Expand Up @@ -107,11 +108,6 @@ internal void AssertRecordedExchanges(AutorecoveringConnection c, int n)
Assert.Equal(n, c.RecordedExchangesCount);
}

internal void AssertRecordedQueues(AutorecoveringConnection c, int n)
{
Assert.Equal(n, c.RecordedQueuesCount);
}

internal Task<AutorecoveringConnection> CreateAutorecoveringConnectionAsync()
{
return CreateAutorecoveringConnectionAsync(RecoveryInterval);
Expand Down Expand Up @@ -226,7 +222,7 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

AutorecoveringConnection aconn = conn as AutorecoveringConnection;
aconn.ConnectionShutdown += (c, args) => tcs.SetResult(true);
aconn.ConnectionShutdown += (c, args) => tcs.TrySetResult(true);

return tcs;
}
Expand Down
Loading