Skip to content

Address GHA test flakes #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "27.1.2",
"rabbitmq": "4.0.3"
"rabbitmq": "4.0.4"
}
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancella

private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
{
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();

using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: cancellationTokenTcs, useSynchronizationContext: false))
Expand All @@ -73,7 +73,7 @@ private static async Task DoWaitAsync(this Task task, CancellationToken cancella

private static async Task<T0> DoWaitGenericAsync<T0>(this Task<T0> task, CancellationToken cancellationToken)
{
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();

using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
state: cancellationTokenTcs, useSynchronizationContext: false))
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ internal void RemoveConsumer(Guid id)
}

private readonly TaskCompletionSource<bool> _connectionClosedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
Utils.CreateTaskCompletionSource<bool>();

public IRpcServerBuilder RpcServerBuilder()
{
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override async Task OpenAsync()
try
{
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
Utils.CreateTaskCompletionSource<ReceiverLink>();

// this is an event to get the filters to the listener context
// it _must_ be here because in case of reconnect the original filters could be not valid anymore
Expand Down
12 changes: 6 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
private const string ReplyTo = "$me";

protected readonly TaskCompletionSource<bool> _managementSessionClosedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
Utils.CreateTaskCompletionSource<bool>();

internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
{
Expand Down Expand Up @@ -269,7 +269,7 @@ private async Task EnsureReceiverLinkAsync()
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
};

var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ReceiverLink> tcs = Utils.CreateTaskCompletionSource<ReceiverLink>();
var tmpReceiverLink = new ReceiverLink(
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
{
Expand Down Expand Up @@ -328,7 +328,7 @@ private async Task EnsureSenderLinkAsync()
},
};

var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<SenderLink> tcs = Utils.CreateTaskCompletionSource<SenderLink>();
var tmpSenderLink = new SenderLink(
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
{
Expand Down Expand Up @@ -425,10 +425,10 @@ internal async Task<Message> RequestAsync(Message message, int[] expectedRespons
// TODO: make the timeout configurable
TimeSpan timeout = argTimeout ?? TimeSpan.FromSeconds(30);

TaskCompletionSource<Message> mre = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<Message> tcs = Utils.CreateTaskCompletionSource<Message>();

// Add TaskCompletionSource to the dictionary it will be used to set the result of the request
if (false == _requests.TryAdd(message.Properties.MessageId, mre))
if (false == _requests.TryAdd(message.Properties.MessageId, tcs))
{
// TODO what to do in this error case?
}
Expand Down Expand Up @@ -461,7 +461,7 @@ void RequestTimeoutAction()

// The response is handled in a separate thread, see ProcessResponses method in the Init method
// TODO timeout & token
Message result = await mre.Task.WaitAsync(linkedCts.Token)
Message result = await tcs.Task.WaitAsync(linkedCts.Token)
.ConfigureAwait(false);

await sendTask.WaitAsync(linkedCts.Token)
Expand Down
5 changes: 3 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public override async Task OpenAsync()
try
{
TaskCompletionSource<SenderLink> attachCompletedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
Utils.CreateTaskCompletionSource<SenderLink>();

Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);

Expand Down Expand Up @@ -122,7 +122,8 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
try
{
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
new(TaskCreationOptions.RunContinuationsAsynchronously);
Utils.CreateTaskCompletionSource<PublishOutcome>();

Message nativeMessage = ((AmqpMessage)message).NativeMessage;

void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
Expand Down
19 changes: 15 additions & 4 deletions RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,16 @@ internal DefaultQueueInfo(string queueName)

internal DefaultQueueInfo(Map response)
{
_name = (string)response["name"];
if (response["name"] is string name)
{
_name = name;
}
else
{
// TODO error?
_name = string.Empty;
}

_durable = (bool)response["durable"];
_autoDelete = (bool)response["auto_delete"];
_exclusive = (bool)response["exclusive"];
Expand All @@ -48,10 +57,12 @@ internal DefaultQueueInfo(Map response)

_leader = (string)response["leader"];

string[]? replicas = (string[])response["replicas"];
if (replicas.Length > 0)
if (response["replicas"] is string[] queueReplicas)
{
_replicas.AddRange(replicas);
if (queueReplicas.Length > 0)
{
_replicas.AddRange(queueReplicas);
}
}

_messageCount = (ulong)response["message_count"];
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public override async Task OpenAsync()
.Queue(_configuration.ReplyToQueue)
.MessageHandler((context, message) =>
{
// TODO MessageHandler funcs should catch all exceptions
context.Accept();
object correlationId = ExtractCorrelationId(message);
if (_pendingRequests.TryGetValue(correlationId, out TaskCompletionSource<IMessage>? request))
Expand Down Expand Up @@ -198,8 +199,7 @@ public async Task<IMessage> PublishAsync(IMessage message, CancellationToken can
{
object correlationId = CorrelationIdSupplier();
message = RequestPostProcess(message, correlationId);
_pendingRequests.TryAdd(correlationId,
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
_pendingRequests.TryAdd(correlationId, Utils.CreateTaskCompletionSource<IMessage>());
if (_publisher != null)
{
PublishResult pr = await _publisher.PublishAsync(
Expand Down
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal async Task<Session> GetOrCreateSessionAsync()
}
else
{
TaskCompletionSource<ISession> sessionBeginTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
void OnBegin(ISession session, Begin peerBegin)
{
sessionBeginTcs.SetResult(session);
Expand Down
7 changes: 7 additions & 0 deletions RabbitMQ.AMQP.Client/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -39,6 +40,12 @@ internal static int RandomNext(int minValue = 0, int maxValue = 1024)
#endif
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static TaskCompletionSource<T> CreateTaskCompletionSource<T>()
{
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
}

internal static string GenerateQueueName()
{
return GenerateName(DefaultPrefix);
Expand Down
4 changes: 2 additions & 2 deletions Tests/AnonymousPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ await publisher.PublishAsync(new AmqpMessage("Hello, World!").ToAddress().Queue(
.Build());
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
await Task.Delay(200);
await SystemUtils.WaitUntilQueueMessageCount(_queueName, 1);
await SystemUtils.WaitUntilQueueMessageCount(_queueName + "2", 0);
await WaitUntilQueueMessageCount(_queueName, 1);
await WaitUntilQueueMessageCount(_queueName + "2", 0);
await _management.Queue(_queueName + "2").Quorum().Queue().DeleteAsync();
}
}
Expand Down
52 changes: 26 additions & 26 deletions Tests/BindingsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ public async Task SimpleBindingsBetweenExchangeAndQueue(string sourceExchange, s
.Key("key");
await bindingSpec.BindAsync();

await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
await WaitUntilExchangeExistsAsync(sourceExchangeSpec);

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(sourceExchangeSpec, destinationQueueSpec);
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(sourceExchangeSpec, destinationQueueSpec);

await bindingSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
destinationQueueSpec);

/*
Expand All @@ -50,8 +50,8 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceE
await destinationQueueSpec.DeleteAsync();
await _connection.CloseAsync();

await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchangeSpec);
await SystemUtils.WaitUntilQueueDeletedAsync(destinationQueueSpec);
await WaitUntilExchangeDeletedAsync(sourceExchangeSpec);
await WaitUntilQueueDeletedAsync(destinationQueueSpec);
}

[Fact]
Expand All @@ -78,23 +78,23 @@ public async Task BindBetweenExchangeAndQueueTwoTimes()
await firstBindingSpec.BindAsync();
await secondBindingSpec.BindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);

await firstBindingSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);

await secondBindingSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);

await exchangeSpec.DeleteAsync();
await queueSpec.DeleteAsync();

await _connection.CloseAsync();

await SystemUtils.WaitUntilExchangeDeletedAsync(exchangeSpec);
await SystemUtils.WaitUntilQueueDeletedAsync(queueSpec);
await WaitUntilExchangeDeletedAsync(exchangeSpec);
await WaitUntilQueueDeletedAsync(queueSpec);
}

[Theory]
Expand Down Expand Up @@ -128,21 +128,21 @@ await WhenAllComplete(

await bindingSpecification.BindAsync();

await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
await WaitUntilExchangeExistsAsync(sourceExchangeSpec);
await WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
destinationExchangeSpec);

await bindingSpecification.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
await WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
destinationExchangeSpec);

await sourceExchangeSpec.DeleteAsync();
await destinationExchangeSpec.DeleteAsync();
await _connection.CloseAsync();

await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchangeName);
await SystemUtils.WaitUntilExchangeDeletedAsync(destinationExchangeName);
await WaitUntilExchangeDeletedAsync(sourceExchangeName);
await WaitUntilExchangeDeletedAsync(destinationExchangeName);
}

[Theory]
Expand All @@ -163,8 +163,8 @@ public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(s
await exchangeSpec.DeclareAsync();
await queueSpec.DeclareAsync();

await SystemUtils.WaitUntilExchangeExistsAsync(exchangeSpec);
await SystemUtils.WaitUntilQueueExistsAsync(queueSpec);
await WaitUntilExchangeExistsAsync(exchangeSpec);
await WaitUntilQueueExistsAsync(queueSpec);

var arguments = new Dictionary<string, object> { { key1, value1 }, { key2, value2 } };
IBindingSpecification bindingSpecification = _management.Binding()
Expand All @@ -174,24 +174,24 @@ public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(s
.Arguments(arguments);
await bindingSpecification.BindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(
exchangeSpec,
queueSpec, arguments);

await bindingSpecification.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(
exchangeSpec,
queueSpec, arguments);

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);

await exchangeSpec.DeleteAsync();
await queueSpec.DeleteAsync();
await _connection.CloseAsync();

await SystemUtils.WaitUntilExchangeDeletedAsync(exchangeSpec);
await SystemUtils.WaitUntilQueueDeletedAsync(queueSpec);
await WaitUntilExchangeDeletedAsync(exchangeSpec);
await WaitUntilQueueDeletedAsync(queueSpec);
}

// TODO: test with multi-bindings with parameters with list as value
Expand Down Expand Up @@ -236,19 +236,19 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
.Arguments(specialBindArgs);
await specialBindSpec.BindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
specialBindArgs);

await specialBindSpec.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
specialBindArgs);

for (int i = 0; i < 10; i++)
{
var bindArgs = new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } };

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
bindArgs);

await _management.Binding()
Expand All @@ -258,7 +258,7 @@ await _management.Binding()
.Arguments(bindArgs)
.UnbindAsync();

await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
bindArgs);
}
await exchangeSpec.DeleteAsync();
Expand Down
Loading
Loading