Skip to content

Improve test speed #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/install.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ $env:ERLANG_HOME = $erlang_home

Write-Host "[INFO] Setting RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS..."
$env:RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS = '-rabbitmq_stream advertised_host localhost'
[Environment]::SetEnvironmentVariable('RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS', '-rabbitmq_stream advertised_host localhost', 'Machine')
[Environment]::SetEnvironmentVariable('RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS', '-rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4', 'Machine')

Write-Host '[INFO] Downloading RabbitMQ...'

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
rabbitmq:
image: rabbitmq:3.13.0-beta.6-management
env:
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost -rabbit collect_statistics_interval 4
ports:
- 5552:5552
- 5672:5672
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build:
dotnet build $(CURDIR)/Build.csproj

test: build
dotnet test $(CURDIR)/Tests/Tests.csproj --no-build --logger "console;verbosity=detailed" /p:AltCover=true
dotnet test -c Debug $(CURDIR)/Tests/Tests.csproj --no-build --logger:"console;verbosity=detailed" /p:AltCover=true

rabbitmq-server:
docker run -it --rm --name rabbitmq-stream-docker \
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task<IClient> CreateClient(ClientParameters clientParameters, ILogg
/// </summary>
public static class RoutingHelper<T> where T : IRouting, new()
{
private static async Task<IClient> LookupConnection(
internal static async Task<IClient> LookupConnection(
ClientParameters clientParameters,
Broker broker,
int maxAttempts,
Expand Down
9 changes: 3 additions & 6 deletions Tests/ReliableTests.cs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather remove commented code, we can always revert the commit if we want to "recover" this code.

Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork()
await producer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
}

SystemUtils.Wait(TimeSpan.FromSeconds(6));
Assert.Equal(1, SystemUtils.HttpKillConnections(clientProvidedName).Result);
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProvidedName).Result == 1);

await SystemUtils.HttpKillConnections(clientProvidedNameLocator);

for (var i = 0; i < 5; i++)
Expand Down Expand Up @@ -339,7 +339,6 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
await Task.CompletedTask;
}
});
SystemUtils.Wait(TimeSpan.FromSeconds(1));
// in this case we kill the connection before consume consume any message
// so it should use the selected OffsetSpec in this case = new OffsetTypeFirst(),

Expand Down Expand Up @@ -387,13 +386,11 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
await Task.CompletedTask;
}
});
SystemUtils.Wait(TimeSpan.FromSeconds(4));
// kill the first time
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProviderName).Result == 1);
await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
Guid.NewGuid().ToString(),
_testOutputHelper);
SystemUtils.Wait(TimeSpan.FromSeconds(4));
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProviderName).Result == 1);
new Utils<bool>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
// after kill the consumer must be open
Expand All @@ -417,7 +414,7 @@ public async void ConsumerHandleDeleteStreamWithMetaDataUpdate()
// When the stream is deleted the consumer has to close the
// connection an become inactive.
await system.DeleteStream(stream);
SystemUtils.Wait(TimeSpan.FromSeconds(5));
SystemUtils.Wait(TimeSpan.FromSeconds(3));
Assert.False(consumer.IsOpen());
await system.Close();
}
Expand Down
97 changes: 96 additions & 1 deletion Tests/Resources/definition_test.json
Original file line number Diff line number Diff line change
@@ -1 +1,96 @@
{"rabbit_version":"3.12.0-alpha-stream.304","rabbitmq_version":"3.12.0-alpha-stream.304","product_name":"RabbitMQ","product_version":"3.12.0-alpha-stream.304","users":[{"name":"test","password_hash":"DGqKjRKV+0PuybgkIoOwVXBTHShsgd6ysuSbINGv29WfBi/s","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["impersonator"],"limits":{}},{"name":"guest","password_hash":"JETOd3pk4cJRpcPtzyzgy8zODxzOxmRYsZzXQyZPpntD5Uti","hashing_algorithm":"rabbit_password_hashing_sha256","tags":["administrator"],"limits":{}}],"vhosts":[{"name":"/"},{"name":"vhost2"}],"permissions":[{"user":"test","vhost":"/","configure":"test_stream","write":"test_stream","read":"test_stream"},{"user":"guest","vhost":"vhost2","configure":".*","write":".*","read":".*"},{"user":"guest","vhost":"/","configure":".*","write":".*","read":".*"}],"topic_permissions":[],"parameters":[],"global_parameters":[{"name":"internal_cluster_id","value":"rabbitmq-cluster-id-_6hTHBLoOCqgQiJla6WCXw"}],"policies":[],"queues":[{"name":"test_stream","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-0","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-1","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"invoices-2","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}},{"name":"no_access_stream","vhost":"/","durable":true,"auto_delete":false,"arguments":{"x-queue-type":"stream"}}],"exchanges":[{"name":"invoices","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{"x-super-stream":true}}],"bindings":[{"source":"invoices","vhost":"/","destination":"invoices-0","destination_type":"queue","routing_key":"0","arguments":{"x-stream-partition-order":0}},{"source":"invoices","vhost":"/","destination":"invoices-1","destination_type":"queue","routing_key":"1","arguments":{"x-stream-partition-order":1}},{"source":"invoices","vhost":"/","destination":"invoices-2","destination_type":"queue","routing_key":"2","arguments":{"x-stream-partition-order":2}}]}
{
"rabbit_version":"3.12.0-alpha-stream.304",
"rabbitmq_version":"3.12.0-alpha-stream.304",
"product_name":"RabbitMQ",
"product_version":"3.12.0-alpha-stream.304",
"users":[
{
"name":"test",
"password_hash":"DGqKjRKV+0PuybgkIoOwVXBTHShsgd6ysuSbINGv29WfBi/s",
"hashing_algorithm":"rabbit_password_hashing_sha256",
"tags":[
"impersonator"
],
"limits":{

}
},
{
"name":"guest",
"password_hash":"JETOd3pk4cJRpcPtzyzgy8zODxzOxmRYsZzXQyZPpntD5Uti",
"hashing_algorithm":"rabbit_password_hashing_sha256",
"tags":[
"administrator"
],
"limits":{

}
}
],
"vhosts":[
{
"name":"/"
},
{
"name":"vhost2"
}
],
"permissions":[
{
"user":"test",
"vhost":"/",
"configure":"test_stream",
"write":"test_stream",
"read":"test_stream"
},
{
"user":"guest",
"vhost":"vhost2",
"configure":".*",
"write":".*",
"read":".*"
},
{
"user":"guest",
"vhost":"/",
"configure":".*",
"write":".*",
"read":".*"
}
],
"topic_permissions":[

],
"parameters":[

],
"global_parameters":[
{
"name":"internal_cluster_id",
"value":"rabbitmq-cluster-id-_6hTHBLoOCqgQiJla6WCXw"
}
],
"policies":[

],
"queues":[
{
"name":"test_stream",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{
"x-queue-type":"stream"
}
},
{
"name":"no_access_stream",
"vhost":"/",
"durable":true,
"auto_delete":false,
"arguments":{
"x-queue-type":"stream"
}
}
]
}
4 changes: 2 additions & 2 deletions Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ public async Task RoutingHelperShouldThrowIfLoadBalancerIsMisconfigured()
new List<Broker>() { new Broker("replica", 5552) });

await Assert.ThrowsAsync<RoutingClientException>(
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupLeaderConnection(clientParameters,
metaDataInfo));
() => RoutingHelper<MisconfiguredLoadBalancerRouting>.LookupConnection(clientParameters,
metaDataInfo.Leader, 3));
}

[Fact]
Expand Down
70 changes: 49 additions & 21 deletions Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using Xunit;
Expand Down Expand Up @@ -128,7 +129,8 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
public static async Task PublishMessages(StreamSystem system, string stream, int numberOfMessages,
string producerName, ITestOutputHelper testOutputHelper)
{
testOutputHelper.WriteLine("Publishing messages...");
testOutputHelper.WriteLine(
$"Publishing messages to the stream {stream} number of messages {numberOfMessages}");

var testPassed = new TaskCompletionSource<int>();
var count = 0;
Expand All @@ -155,11 +157,17 @@ public static async Task PublishMessages(StreamSystem system, string stream, int
await producer.Send(Convert.ToUInt64(i), message);
}

testOutputHelper.WriteLine($"Messages sent to the stream {stream} number of messages {numberOfMessages}");

testPassed.Task.Wait(TimeSpan.FromSeconds(10));
Assert.Equal(numberOfMessages, testPassed.Task.Result);
WaitUntil(() => producer.ConfirmFrames >= 1);
WaitUntil(() => producer.IncomingFrames >= 1);
WaitUntil(() => producer.PublishCommandsSent >= 1);

testOutputHelper.WriteLine(
$"Messages sent to the stream {stream} number of messages {numberOfMessages} " +
$"confirmed {producer.ConfirmFrames} incoming {producer.IncomingFrames} publish commands sent {producer.PublishCommandsSent}");
producer.Dispose();
}

Expand All @@ -180,7 +188,8 @@ public static async Task<ConcurrentDictionary<string, IOffsetType>> OffsetsForSu
public static async Task PublishMessagesSuperStream(StreamSystem system, string stream, int numberOfMessages,
string producerName, ITestOutputHelper testOutputHelper)
{
testOutputHelper.WriteLine("Publishing super stream messages...");
testOutputHelper.WriteLine($"Publishing super stream messages...to the stream {stream} " +
$"number of messages {numberOfMessages}");

var testPassed = new TaskCompletionSource<int>();
var count = 0;
Expand Down Expand Up @@ -210,11 +219,16 @@ public static async Task PublishMessagesSuperStream(StreamSystem system, string
await producer.Send(Convert.ToUInt64(i), message);
}

testOutputHelper.WriteLine($"Messages sent to the stream {stream} number of messages {numberOfMessages}");
testPassed.Task.Wait(TimeSpan.FromSeconds(10));
Assert.Equal(numberOfMessages, testPassed.Task.Result);
Assert.True(producer.ConfirmFrames >= 1);
Assert.True(producer.IncomingFrames >= 1);
Assert.True(producer.PublishCommandsSent >= 1);

testOutputHelper.WriteLine(
$"Messages sent to the stream {stream} number of messages {numberOfMessages} " +
$"confirmed {producer.ConfirmFrames} incoming {producer.IncomingFrames} publish commands sent {producer.PublishCommandsSent}");
producer.Dispose();
}

Expand Down Expand Up @@ -373,18 +387,6 @@ public static void HttpDeleteQueue(string queue)
}
}

private static void HttpDeleteExchange(string exchange)
{
var task = CreateHttpClient().DeleteAsync($"http://localhost:15672/api/exchanges/%2F/{exchange}");
task.Wait();
var result = task.Result;
if (!result.IsSuccessStatusCode && result.StatusCode != HttpStatusCode.NotFound)
{
throw new XunitException(string.Format("HTTP DELETE failed: {0} {1}", result.StatusCode,
result.ReasonPhrase));
}
}

public static byte[] GetFileContent(string fileName)
{
var codeBaseUrl = new Uri(Assembly.GetExecutingAssembly().Location);
Expand All @@ -403,14 +405,40 @@ public static byte[] GetFileContent(string fileName)

public static void ResetSuperStreams()
{
HttpDeleteExchange("invoices");
HttpDeleteQueue("invoices-0");
HttpDeleteQueue("invoices-1");
HttpDeleteQueue("invoices-2");
var factory = new ConnectionFactory();
using var connection = factory.CreateConnection();
var channel = connection.CreateModel();

channel.ExchangeDelete(InvoicesExchange);

channel.QueueDelete(InvoicesStream0);
channel.QueueDelete(InvoicesStream1);
channel.QueueDelete(InvoicesStream2);
Wait();

channel.ExchangeDeclare(InvoicesExchange, "direct", true, false,
new Dictionary<string, object>() { { "x-super-stream-enabled", "true" } });

channel.QueueDeclare(InvoicesStream0, true, false, false,
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });

channel.QueueDeclare(InvoicesStream1, true, false, false,
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });

channel.QueueDeclare(InvoicesStream2, true, false, false,
new Dictionary<string, object>() { { "x-queue-type", "stream" }, });
Wait();
HttpPost(
Encoding.Default.GetString(
GetFileContent("definition_test.json")), "definitions");

channel.QueueBind(InvoicesStream0, InvoicesExchange, "0",
new Dictionary<string, object>() { { "x-stream-partition-order", "0" } });

channel.QueueBind(InvoicesStream1, InvoicesExchange, "1",
new Dictionary<string, object>() { { "x-stream-partition-order", "1" } });

channel.QueueBind(InvoicesStream2, InvoicesExchange, "2",
new Dictionary<string, object>() { { "x-stream-partition-order", "2" } });

connection.Close();
}
}
}
3 changes: 2 additions & 1 deletion Tests/xunit.runner.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"parallelizeAssembly": false,
"parallelizeTestCollections": false
"parallelizeTestCollections": false,
"stopOnFail": true
}