Skip to content

Commit 6bab211

Browse files
committed
Address GHA test flakes
* Start by addressing a potential `NullReferenceException` * Try to address `TimeoutException` by using `WhenTcsCompetes` * Standardize in tests on using `WhenTcsCompletes` * Move all of `SystemUtils` into `IntegrationTest.Static.cs` * Consolidate creation of TaskCompletionSource objects to ensure correct TaskCreationOptions are used. * Bump RMQ to 4.0.4 on Windows
1 parent b1c3a58 commit 6bab211

30 files changed

+353
-283
lines changed

.ci/windows/versions.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"erlang": "27.1.2",
3-
"rabbitmq": "4.0.3"
3+
"rabbitmq": "4.0.4"
44
}

RabbitMQ.AMQP.Client/Extensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancella
5656

5757
private static async Task DoWaitAsync(this Task task, CancellationToken cancellationToken)
5858
{
59-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
59+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
6060

6161
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
6262
state: cancellationTokenTcs, useSynchronizationContext: false))
@@ -73,7 +73,7 @@ private static async Task DoWaitAsync(this Task task, CancellationToken cancella
7373

7474
private static async Task<T0> DoWaitGenericAsync<T0>(this Task<T0> task, CancellationToken cancellationToken)
7575
{
76-
var cancellationTokenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
76+
TaskCompletionSource<bool> cancellationTokenTcs = Utils.CreateTaskCompletionSource<bool>();
7777

7878
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
7979
state: cancellationTokenTcs, useSynchronizationContext: false))

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ internal void RemoveConsumer(Guid id)
9696
}
9797

9898
private readonly TaskCompletionSource<bool> _connectionClosedTcs =
99-
new(TaskCreationOptions.RunContinuationsAsynchronously);
99+
Utils.CreateTaskCompletionSource<bool>();
100100

101101
public IRpcServerBuilder RpcServerBuilder()
102102
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public override async Task OpenAsync()
4646
try
4747
{
4848
TaskCompletionSource<ReceiverLink> attachCompletedTcs =
49-
new(TaskCreationOptions.RunContinuationsAsynchronously);
49+
Utils.CreateTaskCompletionSource<ReceiverLink>();
5050

5151
// this is an event to get the filters to the listener context
5252
// it _must_ be here because in case of reconnect the original filters could be not valid anymore

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class AmqpManagement : AbstractLifeCycle, IManagement, IManagementTopolog
4747
private const string ReplyTo = "$me";
4848

4949
protected readonly TaskCompletionSource<bool> _managementSessionClosedTcs =
50-
new(TaskCreationOptions.RunContinuationsAsynchronously);
50+
Utils.CreateTaskCompletionSource<bool>();
5151

5252
internal AmqpManagement(AmqpManagementParameters amqpManagementParameters)
5353
{
@@ -269,7 +269,7 @@ private async Task EnsureReceiverLinkAsync()
269269
new Target() { Address = ManagementNodeAddress, ExpiryPolicy = new Symbol("SESSION_END"), },
270270
};
271271

272-
var tcs = new TaskCompletionSource<ReceiverLink>(TaskCreationOptions.RunContinuationsAsynchronously);
272+
TaskCompletionSource<ReceiverLink> tcs = Utils.CreateTaskCompletionSource<ReceiverLink>();
273273
var tmpReceiverLink = new ReceiverLink(
274274
_managementSession, LinkPairName, receiveAttach, (ILink link, Attach attach) =>
275275
{
@@ -328,7 +328,7 @@ private async Task EnsureSenderLinkAsync()
328328
},
329329
};
330330

331-
var tcs = new TaskCompletionSource<SenderLink>(TaskCreationOptions.RunContinuationsAsynchronously);
331+
TaskCompletionSource<SenderLink> tcs = Utils.CreateTaskCompletionSource<SenderLink>();
332332
var tmpSenderLink = new SenderLink(
333333
_managementSession, LinkPairName, senderAttach, (ILink link, Attach attach) =>
334334
{
@@ -425,10 +425,10 @@ internal async Task<Message> RequestAsync(Message message, int[] expectedRespons
425425
// TODO: make the timeout configurable
426426
TimeSpan timeout = argTimeout ?? TimeSpan.FromSeconds(30);
427427

428-
TaskCompletionSource<Message> mre = new(TaskCreationOptions.RunContinuationsAsynchronously);
428+
TaskCompletionSource<Message> tcs = Utils.CreateTaskCompletionSource<Message>();
429429

430430
// Add TaskCompletionSource to the dictionary it will be used to set the result of the request
431-
if (false == _requests.TryAdd(message.Properties.MessageId, mre))
431+
if (false == _requests.TryAdd(message.Properties.MessageId, tcs))
432432
{
433433
// TODO what to do in this error case?
434434
}
@@ -461,7 +461,7 @@ void RequestTimeoutAction()
461461

462462
// The response is handled in a separate thread, see ProcessResponses method in the Init method
463463
// TODO timeout & token
464-
Message result = await mre.Task.WaitAsync(linkedCts.Token)
464+
Message result = await tcs.Task.WaitAsync(linkedCts.Token)
465465
.ConfigureAwait(false);
466466

467467
await sendTask.WaitAsync(linkedCts.Token)

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public override async Task OpenAsync()
3535
try
3636
{
3737
TaskCompletionSource<SenderLink> attachCompletedTcs =
38-
new(TaskCreationOptions.RunContinuationsAsynchronously);
38+
Utils.CreateTaskCompletionSource<SenderLink>();
3939

4040
Attach attach = Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, _id);
4141

@@ -122,7 +122,8 @@ public async Task<PublishResult> PublishAsync(IMessage message, CancellationToke
122122
try
123123
{
124124
TaskCompletionSource<PublishOutcome> messagePublishedTcs =
125-
new(TaskCreationOptions.RunContinuationsAsynchronously);
125+
Utils.CreateTaskCompletionSource<PublishOutcome>();
126+
126127
Message nativeMessage = ((AmqpMessage)message).NativeMessage;
127128

128129
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)

RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,16 @@ internal DefaultQueueInfo(string queueName)
3535

3636
internal DefaultQueueInfo(Map response)
3737
{
38-
_name = (string)response["name"];
38+
if (response["name"] is string name)
39+
{
40+
_name = name;
41+
}
42+
else
43+
{
44+
// TODO error?
45+
_name = string.Empty;
46+
}
47+
3948
_durable = (bool)response["durable"];
4049
_autoDelete = (bool)response["auto_delete"];
4150
_exclusive = (bool)response["exclusive"];
@@ -48,10 +57,12 @@ internal DefaultQueueInfo(Map response)
4857

4958
_leader = (string)response["leader"];
5059

51-
string[]? replicas = (string[])response["replicas"];
52-
if (replicas.Length > 0)
60+
if (response["replicas"] is string[] queueReplicas)
5361
{
54-
_replicas.AddRange(replicas);
62+
if (queueReplicas.Length > 0)
63+
{
64+
_replicas.AddRange(queueReplicas);
65+
}
5566
}
5667

5768
_messageCount = (ulong)response["message_count"];

RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ public override async Task OpenAsync()
158158
.Queue(_configuration.ReplyToQueue)
159159
.MessageHandler((context, message) =>
160160
{
161+
// TODO MessageHandler funcs should catch all exceptions
161162
context.Accept();
162163
object correlationId = ExtractCorrelationId(message);
163164
if (_pendingRequests.TryGetValue(correlationId, out TaskCompletionSource<IMessage>? request))
@@ -198,8 +199,7 @@ public async Task<IMessage> PublishAsync(IMessage message, CancellationToken can
198199
{
199200
object correlationId = CorrelationIdSupplier();
200201
message = RequestPostProcess(message, correlationId);
201-
_pendingRequests.TryAdd(correlationId,
202-
new TaskCompletionSource<IMessage>(TaskCreationOptions.RunContinuationsAsynchronously));
202+
_pendingRequests.TryAdd(correlationId, Utils.CreateTaskCompletionSource<IMessage>());
203203
if (_publisher != null)
204204
{
205205
PublishResult pr = await _publisher.PublishAsync(

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ internal async Task<Session> GetOrCreateSessionAsync()
3434
}
3535
else
3636
{
37-
TaskCompletionSource<ISession> sessionBeginTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
37+
TaskCompletionSource<ISession> sessionBeginTcs = Utils.CreateTaskCompletionSource<ISession>();
3838
void OnBegin(ISession session, Begin peerBegin)
3939
{
4040
sessionBeginTcs.SetResult(session);

RabbitMQ.AMQP.Client/Utils.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
using System;
66
using System.Collections.Generic;
7+
using System.Runtime.CompilerServices;
78
using System.Security.Cryptography;
89
using System.Text;
910
using System.Threading.Tasks;
@@ -39,6 +40,12 @@ internal static int RandomNext(int minValue = 0, int maxValue = 1024)
3940
#endif
4041
}
4142

43+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
44+
internal static TaskCompletionSource<T> CreateTaskCompletionSource<T>()
45+
{
46+
return new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
47+
}
48+
4249
internal static string GenerateQueueName()
4350
{
4451
return GenerateName(DefaultPrefix);

Tests/AnonymousPublisherTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ await publisher.PublishAsync(new AmqpMessage("Hello, World!").ToAddress().Queue(
9292
.Build());
9393
Assert.Equal(OutcomeState.Accepted, pr.Outcome.State);
9494
await Task.Delay(200);
95-
await SystemUtils.WaitUntilQueueMessageCount(_queueName, 1);
96-
await SystemUtils.WaitUntilQueueMessageCount(_queueName + "2", 0);
95+
await WaitUntilQueueMessageCount(_queueName, 1);
96+
await WaitUntilQueueMessageCount(_queueName + "2", 0);
9797
await _management.Queue(_queueName + "2").Quorum().Queue().DeleteAsync();
9898
}
9999
}

Tests/BindingsTests.cs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ public async Task SimpleBindingsBetweenExchangeAndQueue(string sourceExchange, s
3434
.Key("key");
3535
await bindingSpec.BindAsync();
3636

37-
await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
37+
await WaitUntilExchangeExistsAsync(sourceExchangeSpec);
3838

39-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(sourceExchangeSpec, destinationQueueSpec);
39+
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(sourceExchangeSpec, destinationQueueSpec);
4040

4141
await bindingSpec.UnbindAsync();
4242

43-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
43+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceExchangeSpec,
4444
destinationQueueSpec);
4545

4646
/*
@@ -50,8 +50,8 @@ await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(sourceE
5050
await destinationQueueSpec.DeleteAsync();
5151
await _connection.CloseAsync();
5252

53-
await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchangeSpec);
54-
await SystemUtils.WaitUntilQueueDeletedAsync(destinationQueueSpec);
53+
await WaitUntilExchangeDeletedAsync(sourceExchangeSpec);
54+
await WaitUntilQueueDeletedAsync(destinationQueueSpec);
5555
}
5656

5757
[Fact]
@@ -78,23 +78,23 @@ public async Task BindBetweenExchangeAndQueueTwoTimes()
7878
await firstBindingSpec.BindAsync();
7979
await secondBindingSpec.BindAsync();
8080

81-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
81+
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
8282

8383
await firstBindingSpec.UnbindAsync();
8484

85-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
85+
await WaitUntilBindingsBetweenExchangeAndQueueExistAsync(exchangeSpec, queueSpec);
8686

8787
await secondBindingSpec.UnbindAsync();
8888

89-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
89+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
9090

9191
await exchangeSpec.DeleteAsync();
9292
await queueSpec.DeleteAsync();
9393

9494
await _connection.CloseAsync();
9595

96-
await SystemUtils.WaitUntilExchangeDeletedAsync(exchangeSpec);
97-
await SystemUtils.WaitUntilQueueDeletedAsync(queueSpec);
96+
await WaitUntilExchangeDeletedAsync(exchangeSpec);
97+
await WaitUntilQueueDeletedAsync(queueSpec);
9898
}
9999

100100
[Theory]
@@ -128,21 +128,21 @@ await WhenAllComplete(
128128

129129
await bindingSpecification.BindAsync();
130130

131-
await SystemUtils.WaitUntilExchangeExistsAsync(sourceExchangeSpec);
132-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
131+
await WaitUntilExchangeExistsAsync(sourceExchangeSpec);
132+
await WaitUntilBindingsBetweenExchangeAndExchangeExistAsync(sourceExchangeSpec,
133133
destinationExchangeSpec);
134134

135135
await bindingSpecification.UnbindAsync();
136136

137-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
137+
await WaitUntilBindingsBetweenExchangeAndExchangeDontExistAsync(sourceExchangeSpec,
138138
destinationExchangeSpec);
139139

140140
await sourceExchangeSpec.DeleteAsync();
141141
await destinationExchangeSpec.DeleteAsync();
142142
await _connection.CloseAsync();
143143

144-
await SystemUtils.WaitUntilExchangeDeletedAsync(sourceExchangeName);
145-
await SystemUtils.WaitUntilExchangeDeletedAsync(destinationExchangeName);
144+
await WaitUntilExchangeDeletedAsync(sourceExchangeName);
145+
await WaitUntilExchangeDeletedAsync(destinationExchangeName);
146146
}
147147

148148
[Theory]
@@ -163,8 +163,8 @@ public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(s
163163
await exchangeSpec.DeclareAsync();
164164
await queueSpec.DeclareAsync();
165165

166-
await SystemUtils.WaitUntilExchangeExistsAsync(exchangeSpec);
167-
await SystemUtils.WaitUntilQueueExistsAsync(queueSpec);
166+
await WaitUntilExchangeExistsAsync(exchangeSpec);
167+
await WaitUntilQueueExistsAsync(queueSpec);
168168

169169
var arguments = new Dictionary<string, object> { { key1, value1 }, { key2, value2 } };
170170
IBindingSpecification bindingSpecification = _management.Binding()
@@ -174,24 +174,24 @@ public async Task BindingsBetweenExchangeAndQueuesWithArgumentsDifferentValues(s
174174
.Arguments(arguments);
175175
await bindingSpecification.BindAsync();
176176

177-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(
177+
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(
178178
exchangeSpec,
179179
queueSpec, arguments);
180180

181181
await bindingSpecification.UnbindAsync();
182182

183-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(
183+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(
184184
exchangeSpec,
185185
queueSpec, arguments);
186186

187-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
187+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistAsync(exchangeSpec, queueSpec);
188188

189189
await exchangeSpec.DeleteAsync();
190190
await queueSpec.DeleteAsync();
191191
await _connection.CloseAsync();
192192

193-
await SystemUtils.WaitUntilExchangeDeletedAsync(exchangeSpec);
194-
await SystemUtils.WaitUntilQueueDeletedAsync(queueSpec);
193+
await WaitUntilExchangeDeletedAsync(exchangeSpec);
194+
await WaitUntilQueueDeletedAsync(queueSpec);
195195
}
196196

197197
// TODO: test with multi-bindings with parameters with list as value
@@ -236,19 +236,19 @@ public async Task MultiBindingsBetweenExchangeAndQueuesWithArgumentsDifferentVal
236236
.Arguments(specialBindArgs);
237237
await specialBindSpec.BindAsync();
238238

239-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
239+
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
240240
specialBindArgs);
241241

242242
await specialBindSpec.UnbindAsync();
243243

244-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
244+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
245245
specialBindArgs);
246246

247247
for (int i = 0; i < 10; i++)
248248
{
249249
var bindArgs = new Dictionary<string, object>() { { $"是英国v_{i}", $"p_{i}" } };
250250

251-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
251+
await WaitUntilBindingsBetweenExchangeAndQueueExistWithArgsAsync(exchangeSpec, queueSpec,
252252
bindArgs);
253253

254254
await _management.Binding()
@@ -258,7 +258,7 @@ await _management.Binding()
258258
.Arguments(bindArgs)
259259
.UnbindAsync();
260260

261-
await SystemUtils.WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
261+
await WaitUntilBindingsBetweenExchangeAndQueueDontExistWithArgsAsync(exchangeSpec, queueSpec,
262262
bindArgs);
263263
}
264264
await exchangeSpec.DeleteAsync();

0 commit comments

Comments
 (0)