Skip to content

Commit 2f97275

Browse files
author
Stefán J. Sigurðarson
committed
Fixing tests.
1 parent b83abda commit 2f97275

File tree

5 files changed

+91
-119
lines changed

5 files changed

+91
-119
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ async Task Loop()
6868
{
6969
while (tokenSource.IsCancellationRequested == false)
7070
{
71-
await semaphore.WaitAsync().ConfigureAwait(false);
72-
if (workQueue.TryDequeue(out Work work))
71+
try
72+
{
73+
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
74+
}
75+
catch (TaskCanceledException)
76+
{
77+
// Swallowing the task cancellation in case we are stopping work.
78+
}
79+
80+
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
7381
{
7482
await work.Execute(model).ConfigureAwait(false);
7583
}
@@ -79,7 +87,6 @@ async Task Loop()
7987
public void Stop()
8088
{
8189
tokenSource.Cancel();
82-
task.Wait();
8390
}
8491
}
8592
}

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 51 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
using RabbitMQ.Client.Exceptions;
4343
using RabbitMQ.Client.Impl;
4444
using RabbitMQ.Util;
45+
4546
using System;
4647
using System.Collections.Generic;
4748
using System.IO;
@@ -108,10 +109,6 @@ public class Connection : IConnection
108109
private Timer _heartbeatReadTimer;
109110
private AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
110111

111-
private readonly object _heartBeatReadLock = new object();
112-
private readonly object _heartBeatWriteLock = new object();
113-
private bool m_hasDisposedHeartBeatReadTimer;
114-
private bool m_hasDisposedHeartBeatWriteTimer;
115112
private Task _mainLoopTask;
116113

117114
#if CORECLR
@@ -543,8 +540,6 @@ public void EnsureIsOpen()
543540
// Only call at the end of the Mainloop or HeartbeatLoop
544541
public void FinishClose()
545542
{
546-
// Notify hearbeat loops that they can leave
547-
m_heartbeatRead.Set();
548543
m_closed = true;
549544
MaybeStopHeartbeatTimers();
550545

@@ -1014,8 +1009,6 @@ public void MaybeStartHeartbeatTimers()
10141009
{
10151010
if (Heartbeat != 0)
10161011
{
1017-
m_hasDisposedHeartBeatReadTimer = false;
1018-
m_hasDisposedHeartBeatWriteTimer = false;
10191012
#if NETFX_CORE
10201013
lock(_heartBeatWriteLock)
10211014
{
@@ -1030,17 +1023,15 @@ public void MaybeStartHeartbeatTimers()
10301023
_heartbeatReadTimer.Change(300, Timeout.Infinite);
10311024
}
10321025
#else
1033-
lock (_heartBeatWriteLock)
1026+
if (_heartbeatWriteTimer == null)
10341027
{
10351028
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback, null, Timeout.Infinite, Timeout.Infinite);
1036-
10371029
_heartbeatWriteTimer.Change(200, Timeout.Infinite);
10381030
}
10391031

1040-
lock (_heartBeatReadLock)
1032+
if (_heartbeatReadTimer == null)
10411033
{
10421034
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback, null, Timeout.Infinite, Timeout.Infinite);
1043-
10441035
_heartbeatReadTimer.Change(300, Timeout.Infinite);
10451036
}
10461037
#endif
@@ -1054,75 +1045,69 @@ public void StartMainLoop(bool useBackgroundThread)
10541045

10551046
public void HeartbeatReadTimerCallback(object state)
10561047
{
1057-
lock (_heartBeatReadLock)
1048+
if (_heartbeatReadTimer == null)
10581049
{
1059-
if (m_hasDisposedHeartBeatReadTimer)
1060-
{
1061-
return;
1062-
}
1050+
return;
1051+
}
10631052

1064-
bool shouldTerminate = false;
1053+
bool shouldTerminate = false;
10651054

1066-
try
1055+
try
1056+
{
1057+
if (!m_closed)
10671058
{
1068-
if (!m_closed)
1059+
if (!m_heartbeatRead.WaitOne(0))
10691060
{
1070-
if (!m_heartbeatRead.WaitOne(0))
1071-
{
1072-
m_missedHeartbeats++;
1073-
}
1074-
else
1075-
{
1076-
m_missedHeartbeats = 0;
1077-
}
1078-
1079-
// We check against 8 = 2 * 4 because we need to wait for at
1080-
// least two complete heartbeat setting intervals before
1081-
// complaining, and we've set the socket timeout to a quarter
1082-
// of the heartbeat setting in setHeartbeat above.
1083-
if (m_missedHeartbeats > 2 * 4)
1084-
{
1085-
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
1086-
var eose = new EndOfStreamException(description);
1087-
ESLog.Error(description, eose);
1088-
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
1089-
HandleMainLoopException(
1090-
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
1091-
shouldTerminate = true;
1092-
}
1061+
m_missedHeartbeats++;
10931062
}
1094-
1095-
if (shouldTerminate)
1063+
else
10961064
{
1097-
TerminateMainloop();
1098-
FinishClose();
1065+
m_missedHeartbeats = 0;
10991066
}
1100-
else if (_heartbeatReadTimer != null)
1067+
1068+
// We check against 8 = 2 * 4 because we need to wait for at
1069+
// least two complete heartbeat setting intervals before
1070+
// complaining, and we've set the socket timeout to a quarter
1071+
// of the heartbeat setting in setHeartbeat above.
1072+
if (m_missedHeartbeats > 2 * 4)
11011073
{
1102-
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
1074+
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
1075+
var eose = new EndOfStreamException(description);
1076+
ESLog.Error(description, eose);
1077+
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
1078+
HandleMainLoopException(
1079+
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
1080+
shouldTerminate = true;
11031081
}
11041082
}
1105-
catch (ObjectDisposedException)
1083+
1084+
if (shouldTerminate)
11061085
{
1107-
// timer is already disposed,
1108-
// e.g. due to shutdown
1086+
TerminateMainloop();
1087+
FinishClose();
11091088
}
1110-
catch (NullReferenceException)
1089+
else if (_heartbeatReadTimer != null)
11111090
{
1112-
// timer has already been disposed from a different thread after null check
1113-
// this event should be rare
1091+
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
11141092
}
11151093
}
1094+
catch (ObjectDisposedException)
1095+
{
1096+
// timer is already disposed,
1097+
// e.g. due to shutdown
1098+
}
1099+
catch (NullReferenceException)
1100+
{
1101+
// timer has already been disposed from a different thread after null check
1102+
// this event should be rare
1103+
}
11161104
}
11171105

11181106
public void HeartbeatWriteTimerCallback(object state)
11191107
{
1120-
lock (_heartBeatWriteLock)
1108+
if (_heartbeatWriteTimer == null)
11211109
{
1122-
if (m_hasDisposedHeartBeatWriteTimer)
1123-
{
1124-
return;
1125-
}
1110+
return;
11261111
}
11271112

11281113
try
@@ -1143,51 +1128,17 @@ public void HeartbeatWriteTimerCallback(object state)
11431128
// peer unavailability. See rabbitmq/rabbitmq-dotnet-client#638 for details.
11441129
}
11451130

1146-
lock(_heartBeatWriteLock)
1131+
if (m_closed == false)
11471132
{
1148-
if(m_closed == false && _heartbeatWriteTimer != null)
1149-
{
1150-
_heartbeatWriteTimer.Change((int)m_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
1151-
}
1133+
_heartbeatWriteTimer?.Change((int)m_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
11521134
}
11531135
}
11541136

11551137
void MaybeStopHeartbeatTimers()
11561138
{
1157-
lock (_heartBeatReadLock)
1158-
{
1159-
MaybeDisposeTimer(ref _heartbeatReadTimer);
1160-
m_hasDisposedHeartBeatReadTimer = true;
1161-
}
1162-
1163-
lock (_heartBeatWriteLock)
1164-
{
1165-
MaybeDisposeTimer(ref _heartbeatWriteTimer);
1166-
m_hasDisposedHeartBeatWriteTimer = true;
1167-
}
1168-
}
1169-
1170-
private void MaybeDisposeTimer(ref Timer timer)
1171-
{
1172-
// capture the timer to reduce chance of a null ref exception
1173-
var captured = timer;
1174-
if (captured != null)
1175-
{
1176-
try
1177-
{
1178-
captured.Change(Timeout.Infinite, Timeout.Infinite);
1179-
captured.Dispose();
1180-
timer = null;
1181-
}
1182-
catch (ObjectDisposedException)
1183-
{
1184-
// we are shutting down, ignore
1185-
}
1186-
catch (NullReferenceException)
1187-
{
1188-
// this should be very rare but could occur from a race condition
1189-
}
1190-
}
1139+
NotifyHeartbeatListener();
1140+
_heartbeatReadTimer?.Dispose();
1141+
_heartbeatWriteTimer?.Dispose();
11911142
}
11921143

11931144
///<remarks>
@@ -1312,9 +1263,8 @@ private void Dispose(bool disposing)
13121263
// dispose managed resources
13131264
try
13141265
{
1315-
_mainLoopTask.Wait();
1316-
MaybeStopHeartbeatTimers();
13171266
Abort();
1267+
_mainLoopTask.Wait();
13181268
}
13191269
catch (OperationInterruptedException)
13201270
{

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,16 @@ async Task Loop()
6565
{
6666
while (tokenSource.IsCancellationRequested == false)
6767
{
68-
await semaphore.WaitAsync().ConfigureAwait(false);
68+
try
69+
{
70+
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
71+
}
72+
catch (TaskCanceledException)
73+
{
74+
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
75+
}
6976

70-
if (actions.TryDequeue(out Action action))
77+
if (!tokenSource.IsCancellationRequested && actions.TryDequeue(out Action action))
7178
{
7279
try
7380
{
@@ -77,13 +84,13 @@ async Task Loop()
7784
{
7885
}
7986
}
87+
8088
}
8189
}
8290

8391
public void Stop()
8492
{
8593
tokenSource.Cancel();
86-
worker.Wait();
8794
}
8895
}
8996
}

projects/client/Unit/src/unit/TestConcurrentAccessWithSharedConnection.cs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,16 @@
4141
using NUnit.Framework;
4242

4343
using System;
44-
using System.Text;
4544
using System.Linq;
45+
using System.Text;
4646
using System.Threading;
47+
using System.Threading.Tasks;
4748

48-
using RabbitMQ.Client;
49-
50-
namespace RabbitMQ.Client.Unit {
49+
namespace RabbitMQ.Client.Unit
50+
{
5151
[TestFixture]
52-
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture {
52+
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture
53+
{
5354

5455
internal const int threads = 32;
5556
internal CountdownEvent latch;
@@ -59,6 +60,7 @@ public class TestConcurrentAccessWithSharedConnection : IntegrationFixture {
5960
public override void Init()
6061
{
6162
base.Init();
63+
ThreadPool.SetMinThreads(threads, threads);
6264
latch = new CountdownEvent(threads);
6365
}
6466

@@ -150,7 +152,8 @@ public void TestConcurrentChannelOpenAndPublishingCase12()
150152
[Test]
151153
public void TestConcurrentChannelOpenCloseLoop()
152154
{
153-
TestConcurrentChannelOperations((conn) => {
155+
TestConcurrentChannelOperations((conn) =>
156+
{
154157
var ch = conn.CreateModel();
155158
ch.Close();
156159
}, 50);
@@ -164,7 +167,7 @@ internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length)
164167
internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length, int iterations)
165168
{
166169
string s = "payload";
167-
if(length > s.Length)
170+
if (length > s.Length)
168171
{
169172
s.PadRight(length);
170173
}
@@ -174,7 +177,8 @@ internal void TestConcurrentChannelOpenAndPublishingWithBodyOfSize(int length, i
174177

175178
internal void TestConcurrentChannelOpenAndPublishingWithBody(byte[] body, int iterations)
176179
{
177-
TestConcurrentChannelOperations((conn) => {
180+
TestConcurrentChannelOperations((conn) =>
181+
{
178182
// publishing on a shared channel is not supported
179183
// and would missing the point of this test anyway
180184
var ch = Conn.CreateModel();
@@ -196,18 +200,20 @@ internal void TestConcurrentChannelOperations(Action<IConnection> actions,
196200
internal void TestConcurrentChannelOperations(Action<IConnection> actions,
197201
int iterations, TimeSpan timeout)
198202
{
199-
foreach (var i in Enumerable.Range(0, threads))
203+
var tasks = Enumerable.Range(0, threads).Select(x =>
200204
{
201-
var t = new Thread(() => {
205+
return Task.Run(() =>
206+
{
202207
foreach (var j in Enumerable.Range(0, iterations))
203208
{
204209
actions(Conn);
205210
}
211+
206212
latch.Signal();
207213
});
208-
t.Start();
209-
}
214+
}).ToArray();
210215

216+
//Task.WhenAll(tasks).Wait();
211217
Assert.IsTrue(latch.Wait(timeout));
212218
// incorrect frame interleaving in these tests will result
213219
// in an unrecoverable connection-level exception, thus

projects/client/Unit/src/unit/TestHeartbeats.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public void TestHundredsOfConnectionsWithRandomHeartbeatInterval()
9999
{
100100
var rnd = new Random();
101101
List<IConnection> xs = new List<IConnection>();
102+
// Since we are using the ThreadPool, let's set MinThreads to a high-enough value.
103+
ThreadPool.SetMinThreads(200, 200);
102104
for (var i = 0; i < 200; i++)
103105
{
104106
var n = Convert.ToUInt16(rnd.Next(2, 6));

0 commit comments

Comments
 (0)