Skip to content

Commit 162b410

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Fixing tests.
1 parent b5d507f commit 162b410

File tree

5 files changed

+91
-119
lines changed

5 files changed

+91
-119
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ async Task Loop()
6868
{
6969
while (tokenSource.IsCancellationRequested == false)
7070
{
71-
await semaphore.WaitAsync().ConfigureAwait(false);
72-
if (workQueue.TryDequeue(out Work work))
71+
try
72+
{
73+
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
74+
}
75+
catch (TaskCanceledException)
76+
{
77+
// Swallowing the task cancellation in case we are stopping work.
78+
}
79+
80+
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
7381
{
7482
await work.Execute(model).ConfigureAwait(false);
7583
}
@@ -79,7 +87,6 @@ async Task Loop()
7987
public void Stop()
8088
{
8189
tokenSource.Cancel();
82-
task.Wait();
8390
}
8491
}
8592
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 51 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Impl;
4444
using RabbitMQ.Util;
45+
4546
using System;
4647
using System.Collections.Generic;
4748
using System.IO;
@@ -98,10 +99,6 @@ public class Connection : IConnection
9899
private Timer _heartbeatReadTimer;
99100
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
100101

101-
private readonly object _heartBeatReadLock = new object();
102-
private readonly object _heartBeatWriteLock = new object();
103-
private bool m_hasDisposedHeartBeatReadTimer;
104-
private bool m_hasDisposedHeartBeatWriteTimer;
105102
private Task _mainLoopTask;
106103

107104
private static string version = typeof(Connection).Assembly
@@ -519,8 +516,6 @@ public void EnsureIsOpen()
519516
// Only call at the end of the Mainloop or HeartbeatLoop
520517
public void FinishClose()
521518
{
522-
// Notify hearbeat loops that they can leave
523-
m_heartbeatRead.Set();
524519
m_closed = true;
525520
MaybeStopHeartbeatTimers();
526521

@@ -948,19 +943,15 @@ public void MaybeStartHeartbeatTimers()
948943
{
949944
if (Heartbeat != 0)
950945
{
951-
m_hasDisposedHeartBeatReadTimer = false;
952-
m_hasDisposedHeartBeatWriteTimer = false;
953-
lock (_heartBeatWriteLock)
946+
if (_heartbeatWriteTimer == null)
954947
{
955948
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback, null, Timeout.Infinite, Timeout.Infinite);
956-
957949
_heartbeatWriteTimer.Change(200, Timeout.Infinite);
958950
}
959951

960-
lock (_heartBeatReadLock)
952+
if (_heartbeatReadTimer == null)
961953
{
962954
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback, null, Timeout.Infinite, Timeout.Infinite);
963-
964955
_heartbeatReadTimer.Change(300, Timeout.Infinite);
965956
}
966957
}
@@ -973,75 +964,69 @@ public void StartMainLoop(bool useBackgroundThread)
973964

974965
public void HeartbeatReadTimerCallback(object state)
975966
{
976-
lock (_heartBeatReadLock)
967+
if (_heartbeatReadTimer == null)
977968
{
978-
if (m_hasDisposedHeartBeatReadTimer)
979-
{
980-
return;
981-
}
969+
return;
970+
}
982971

983-
bool shouldTerminate = false;
972+
bool shouldTerminate = false;
984973

985-
try
974+
try
975+
{
976+
if (!m_closed)
986977
{
987-
if (!m_closed)
978+
if (!m_heartbeatRead.WaitOne(0))
988979
{
989-
if (!m_heartbeatRead.WaitOne(0))
990-
{
991-
m_missedHeartbeats++;
992-
}
993-
else
994-
{
995-
m_missedHeartbeats = 0;
996-
}
997-
998-
// We check against 8 = 2 * 4 because we need to wait for at
999-
// least two complete heartbeat setting intervals before
1000-
// complaining, and we've set the socket timeout to a quarter
1001-
// of the heartbeat setting in setHeartbeat above.
1002-
if (m_missedHeartbeats > 2 * 4)
1003-
{
1004-
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
1005-
var eose = new EndOfStreamException(description);
1006-
ESLog.Error(description, eose);
1007-
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
1008-
HandleMainLoopException(
1009-
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
1010-
shouldTerminate = true;
1011-
}
980+
m_missedHeartbeats++;
1012981
}
1013-
1014-
if (shouldTerminate)
982+
else
1015983
{
1016-
TerminateMainloop();
1017-
FinishClose();
984+
m_missedHeartbeats = 0;
1018985
}
1019-
else if (_heartbeatReadTimer != null)
986+
987+
// We check against 8 = 2 * 4 because we need to wait for at
988+
// least two complete heartbeat setting intervals before
989+
// complaining, and we've set the socket timeout to a quarter
990+
// of the heartbeat setting in setHeartbeat above.
991+
if (m_missedHeartbeats > 2 * 4)
1020992
{
1021-
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
993+
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
994+
var eose = new EndOfStreamException(description);
995+
ESLog.Error(description, eose);
996+
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
997+
HandleMainLoopException(
998+
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
999+
shouldTerminate = true;
10221000
}
10231001
}
1024-
catch (ObjectDisposedException)
1002+
1003+
if (shouldTerminate)
10251004
{
1026-
// timer is already disposed,
1027-
// e.g. due to shutdown
1005+
TerminateMainloop();
1006+
FinishClose();
10281007
}
1029-
catch (NullReferenceException)
1008+
else if (_heartbeatReadTimer != null)
10301009
{
1031-
// timer has already been disposed from a different thread after null check
1032-
// this event should be rare
1010+
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
10331011
}
10341012
}
1013+
catch (ObjectDisposedException)
1014+
{
1015+
// timer is already disposed,
1016+
// e.g. due to shutdown
1017+
}
1018+
catch (NullReferenceException)
1019+
{
1020+
// timer has already been disposed from a different thread after null check
1021+
// this event should be rare
1022+
}
10351023
}
10361024

10371025
public void HeartbeatWriteTimerCallback(object state)
10381026
{
1039-
lock (_heartBeatWriteLock)
1027+
if (_heartbeatWriteTimer == null)
10401028
{
1041-
if (m_hasDisposedHeartBeatWriteTimer)
1042-
{
1043-
return;
1044-
}
1029+
return;
10451030
}
10461031

10471032
try
@@ -1062,51 +1047,17 @@ public void HeartbeatWriteTimerCallback(object state)
10621047
// peer unavailability. See rabbitmq/rabbitmq-dotnet-client#638 for details.
10631048
}
10641049

1065-
lock(_heartBeatWriteLock)
1050+
if (m_closed == false)
10661051
{
1067-
if(m_closed == false && _heartbeatWriteTimer != null)
1068-
{
1069-
_heartbeatWriteTimer.Change((int)m_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
1070-
}
1052+
_heartbeatWriteTimer?.Change((int)m_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
10711053
}
10721054
}
10731055

10741056
void MaybeStopHeartbeatTimers()
10751057
{
1076-
lock (_heartBeatReadLock)
1077-
{
1078-
MaybeDisposeTimer(ref _heartbeatReadTimer);
1079-
m_hasDisposedHeartBeatReadTimer = true;
1080-
}
1081-
1082-
lock (_heartBeatWriteLock)
1083-
{
1084-
MaybeDisposeTimer(ref _heartbeatWriteTimer);
1085-
m_hasDisposedHeartBeatWriteTimer = true;
1086-
}
1087-
}
1088-
1089-
private void MaybeDisposeTimer(ref Timer timer)
1090-
{
1091-
// capture the timer to reduce chance of a null ref exception
1092-
var captured = timer;
1093-
if (captured != null)
1094-
{
1095-
try
1096-
{
1097-
captured.Change(Timeout.Infinite, Timeout.Infinite);
1098-
captured.Dispose();
1099-
timer = null;
1100-
}
1101-
catch (ObjectDisposedException)
1102-
{
1103-
// we are shutting down, ignore
1104-
}
1105-
catch (NullReferenceException)
1106-
{
1107-
// this should be very rare but could occur from a race condition
1108-
}
1109-
}
1058+
NotifyHeartbeatListener();
1059+
_heartbeatReadTimer?.Dispose();
1060+
_heartbeatWriteTimer?.Dispose();
11101061
}
11111062

11121063
///<remarks>
@@ -1231,9 +1182,8 @@ private void Dispose(bool disposing)
12311182
// dispose managed resources
12321183
try
12331184
{
1234-
_mainLoopTask.Wait();
1235-
MaybeStopHeartbeatTimers();
12361185
Abort();
1186+
_mainLoopTask.Wait();
12371187
}
12381188
catch (OperationInterruptedException)
12391189
{

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,16 @@ async Task Loop()
6565
{
6666
while (tokenSource.IsCancellationRequested == false)
6767
{
68-
await semaphore.WaitAsync().ConfigureAwait(false);
68+
try
69+
{
70+
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
71+
}
72+
catch (TaskCanceledException)
73+
{
74+
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
75+
}
6976

70-
if (actions.TryDequeue(out Action action))
77+
if (!tokenSource.IsCancellationRequested && actions.TryDequeue(out Action action))
7178
{
7279
try
7380
{
@@ -77,13 +84,13 @@ async Task Loop()
7784
{
7885
}
7986
}
87+
8088
}
8189
}
8290

8391
public void Stop()
8492
{
8593
tokenSource.Cancel();
86-
worker.Wait();
8794
}
8895
}
8996
}

projects/client/Unit/src/unit/TestConcurrentAccessWithSharedConnection.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,16 @@
4141
using NUnit.Framework;
4242

4343
using System;
44-
using System.Text;
4544
using System.Linq;
45+
using System.Text;
4646
using System.Threading;
47+
using System.Threading.Tasks;
4748

48-
using RabbitMQ.Client;
49-
50-
namespace RabbitMQ.Client.Unit {
49+
namespace RabbitMQ.Client.Unit
50+
{
5151
[TestFixture]
52-
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture {
52+
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture
53+
{
5354

5455
internal const int threads = 32;
5556
internal CountdownEvent latch;
@@ -59,6 +60,7 @@ public class TestConcurrentAccessWithSharedConnection : IntegrationFixture {
5960
public override void Init()
6061
{
6162
base.Init();
63+
ThreadPool.SetMinThreads(threads, threads);
6264
latch = new CountdownEvent(threads);
6365
}
6466

@@ -150,7 +152,8 @@ public void TestConcurrentChannelOpenAndPublishingCase12()
150152
[Test]
151153
public void TestConcurrentChannelOpenCloseLoop()
152154
{
153-
TestConcurrentChannelOperations((conn) => {
155+
TestConcurrentChannelOperations((conn) =>
156+
{
154157
var ch = conn.CreateModel();
155158
ch.Close();
156159
}, 50);
@@ -164,7 +167,7 @@ internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length)
164167
internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length, int iterations)
165168
{
166169
string s = "payload";
167-
if(length > s.Length)
170+
if (length > s.Length)
168171
{
169172
s.PadRight(length);
170173
}
@@ -174,7 +177,8 @@ internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length, i
174177

175178
internal void TestConcurrentChannelOpenAndPublishingWithBody(byte[] body, int iterations)
176179
{
177-
TestConcurrentChannelOperations((conn) => {
180+
TestConcurrentChannelOperations((conn) =>
181+
{
178182
// publishing on a shared channel is not supported
179183
// and would missing the point of this test anyway
180184
var ch = Conn.CreateModel();
@@ -196,18 +200,20 @@ internal void TestConcurrentChannelOperations(Action<IConnection> actions,
196200
internal void TestConcurrentChannelOperations(Action<IConnection> actions,
197201
int iterations, TimeSpan timeout)
198202
{
199-
foreach (var i in Enumerable.Range(0, threads))
203+
var tasks = Enumerable.Range(0, threads).Select(x =>
200204
{
201-
var t = new Thread(() => {
205+
return Task.Run(() =>
206+
{
202207
foreach (var j in Enumerable.Range(0, iterations))
203208
{
204209
actions(Conn);
205210
}
211+
206212
latch.Signal();
207213
});
208-
t.Start();
209-
}
214+
}).ToArray();
210215

216+
//Task.WhenAll(tasks).Wait();
211217
Assert.IsTrue(latch.Wait(timeout));
212218
// incorrect frame interleaving in these tests will result
213219
// in an unrecoverable connection-level exception, thus

projects/client/Unit/src/unit/TestHeartbeats.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ public void TestHundredsOfConnectionsWithRandomHeartbeatInterval()
9898
{
9999
var rnd = new Random();
100100
List<IConnection> xs = new List<IConnection>();
101+
// Since we are using the ThreadPool, let's set MinThreads to a high-enough value.
102+
ThreadPool.SetMinThreads(200, 200);
101103
for (var i = 0; i < 200; i++)
102104
{
103105
var n = Convert.ToUInt16(rnd.Next(2, 6));

0 commit comments

Comments
 (0)