Skip to content

Commit ddc6b17

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Using the TPL instead of dedicated threads. Results in a much more stable connection and consumers so far.
1 parent 6b14316 commit ddc6b17

File tree

9 files changed

+167
-92
lines changed

9 files changed

+167
-92
lines changed

RabbitMQDotNetClient.sln

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
2-
# Visual Studio 15
3-
VisualStudioVersion = 15.0.26114.2
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29709.97
44
MinimumVisualStudioVersion = 10.0.40219.1
55
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "projects", "projects", "{068D7DC3-8E6E-4951-B9E3-272C641BF0DE}"
66
EndProject

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@
5050
</PropertyGroup>
5151

5252
<ItemGroup Condition="$('TargetFramework') == 'net462'">
53+
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
54+
</ItemGroup>
55+
56+
<ItemGroup Condition=" '$(TargetFramework)' == 'net452' ">
5357
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
5458
</ItemGroup>
5559

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ public void Quiesce()
1919

2020
public void Shutdown()
2121
{
22-
// necessary evil
23-
this.workService.Stop().GetAwaiter().GetResult();
22+
workService.Stop();
2423
}
2524

2625
public void Shutdown(IModel model)
2726
{
28-
// necessary evil
29-
this.workService.Stop(model).GetAwaiter().GetResult();
27+
workService.Stop(model);
3028
}
3129

3230
public bool IsShutdown
@@ -66,10 +64,10 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
6664
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
6765
{
6866
// the only case where we ignore the shutdown flag.
69-
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
67+
new ModelShutdown(consumer, reason).Execute(model).GetAwaiter().GetResult();
7068
}
7169

72-
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
70+
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
7371
where TWork : Work
7472
{
7573
if (!this.IsShutdown)

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
using System;
1+
using RabbitMQ.Client.Impl;
2+
3+
using System;
24
using System.Collections.Concurrent;
35
using System.Threading;
6+
#if CORECLR
7+
using System.Threading.Channels;
8+
#endif
49
using System.Threading.Tasks;
5-
using RabbitMQ.Client.Impl;
610

711
namespace RabbitMQ.Client
812
{
@@ -13,42 +17,75 @@ internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1317
public void Schedule<TWork>(ModelBase model, TWork work)
1418
where TWork : Work
1519
{
16-
// two step approach is taken, as TryGetValue does not aquire locks
17-
// if this fails, GetOrAdd is called, which takes a lock
18-
19-
WorkPool workPool;
20-
if (workPools.TryGetValue(model, out workPool) == false)
21-
{
22-
var newWorkPool = new WorkPool(model);
23-
workPool = workPools.GetOrAdd(model, newWorkPool);
24-
25-
// start if it's only the workpool that has been just created
26-
if (newWorkPool == workPool)
27-
{
28-
newWorkPool.Start();
29-
}
30-
}
20+
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
21+
}
3122

32-
workPool.Enqueue(work);
23+
private WorkPool StartNewWorkPool(IModel model)
24+
{
25+
var newWorkPool = new WorkPool(model as ModelBase);
26+
newWorkPool.Start();
27+
return newWorkPool;
3328
}
3429

35-
public async Task Stop(IModel model)
30+
public void Stop(IModel model)
3631
{
3732
WorkPool workPool;
3833
if (workPools.TryRemove(model, out workPool))
3934
{
40-
await workPool.Stop().ConfigureAwait(false);
35+
workPool.Stop();
4136
}
4237
}
4338

44-
public async Task Stop()
39+
public void Stop()
4540
{
4641
foreach (var model in workPools.Keys)
4742
{
48-
await Stop(model).ConfigureAwait(false);
43+
Stop(model);
4944
}
5045
}
5146

47+
#if CORECLR
48+
class WorkPool
49+
{
50+
readonly Channel<Work> actionChannel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });
51+
readonly ModelBase model;
52+
Task worker;
53+
54+
public WorkPool(ModelBase model)
55+
{
56+
this.model = model;
57+
}
58+
59+
public void Start()
60+
{
61+
worker = Task.Run(Loop);
62+
}
63+
64+
public void Enqueue(Work action)
65+
{
66+
actionChannel.Writer.TryWrite(action);
67+
}
68+
69+
async Task Loop()
70+
{
71+
while (await actionChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
72+
{
73+
while (actionChannel.Reader.TryRead(out Work action))
74+
{
75+
await action.Execute(model).ConfigureAwait(false);
76+
}
77+
}
78+
}
79+
80+
public void Stop()
81+
{
82+
actionChannel.Writer.Complete();
83+
worker.Wait();
84+
}
85+
}
86+
}
87+
}
88+
#else
5289
class WorkPool
5390
{
5491
readonly ConcurrentQueue<Work> workQueue;
@@ -82,8 +119,7 @@ async Task Loop()
82119
{
83120
while (tokenSource.IsCancellationRequested == false)
84121
{
85-
Work work;
86-
while (workQueue.TryDequeue(out work))
122+
while (workQueue.TryDequeue(out Work work))
87123
{
88124
await work.Execute(model).ConfigureAwait(false);
89125
}
@@ -101,4 +137,5 @@ public Task Stop()
101137
}
102138
}
103139
}
104-
}
140+
}
141+
#endif

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
//---------------------------------------------------------------------------
4040

4141
using RabbitMQ.Client.Events;
42-
using RabbitMQ.Client.Exceptions;
4342
using RabbitMQ.Client.Impl;
43+
4444
using System;
45-
using System.Collections.Generic;
4645
using System.Collections.Concurrent;
46+
using System.Collections.Generic;
4747
using System.Linq;
4848
using System.Threading;
4949
using System.Threading.Tasks;
@@ -93,7 +93,7 @@ public AutorecoveringConnection(ConnectionFactory factory, string clientProvided
9393
m_factory = factory;
9494
this.ClientProvidedName = clientProvidedName;
9595
}
96-
96+
9797
public event EventHandler<EventArgs> RecoverySucceeded
9898
{
9999
add
@@ -551,12 +551,8 @@ private void Init(IFrameHandler fh)
551551
{
552552
m_delegate = new Connection(m_factory, false,
553553
fh, this.ClientProvidedName);
554-
555-
m_recoveryThread = new Thread(MainRecoveryLoop)
556-
{
557-
IsBackground = true
558-
};
559-
m_recoveryThread.Start();
554+
555+
m_recoveryTask = Task.Run(MainRecoveryLoop);
560556

561557
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
562558
{
@@ -587,7 +583,7 @@ public void UpdateSecret(string newSecret, string reason)
587583
public void Abort()
588584
{
589585
StopRecoveryLoop();
590-
if(m_delegate.IsOpen)
586+
if (m_delegate.IsOpen)
591587
m_delegate.Abort();
592588
}
593589

@@ -606,7 +602,7 @@ public void Abort(int timeout)
606602
if (m_delegate.IsOpen)
607603
m_delegate.Abort(timeout);
608604
}
609-
605+
610606
///<summary>API-side invocation of connection abort with timeout.</summary>
611607
public void Abort(TimeSpan timeout)
612608
{
@@ -622,7 +618,7 @@ public void Abort(ushort reasonCode, string reasonText, int timeout)
622618
if (m_delegate.IsOpen)
623619
m_delegate.Abort(reasonCode, reasonText, timeout);
624620
}
625-
621+
626622
///<summary>API-side invocation of connection abort with timeout.</summary>
627623
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
628624
{
@@ -990,7 +986,7 @@ private bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
990986
// connectivity loss or abrupt shutdown
991987
args.Initiator == ShutdownInitiator.Library);
992988
}
993-
989+
994990
private enum RecoveryCommand
995991
{
996992
/// <summary>
@@ -1017,7 +1013,7 @@ private enum RecoveryConnectionState
10171013
Recovering
10181014
}
10191015

1020-
private Thread m_recoveryThread;
1016+
private Task m_recoveryTask;
10211017
private RecoveryConnectionState m_recoveryLoopState = RecoveryConnectionState.Connected;
10221018

10231019
private readonly BlockingCollection<RecoveryCommand> m_recoveryLoopCommandQueue = new BlockingCollection<RecoveryCommand>();
@@ -1027,7 +1023,7 @@ private enum RecoveryConnectionState
10271023
/// <summary>
10281024
/// This is the main loop for the auto-recovery thread.
10291025
/// </summary>
1030-
private void MainRecoveryLoop()
1026+
private async Task MainRecoveryLoop()
10311027
{
10321028
try
10331029
{
@@ -1036,10 +1032,10 @@ private void MainRecoveryLoop()
10361032
switch (m_recoveryLoopState)
10371033
{
10381034
case RecoveryConnectionState.Connected:
1039-
RecoveryLoopConnectedHandler(command);
1035+
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
10401036
break;
10411037
case RecoveryConnectionState.Recovering:
1042-
RecoveryLoopRecoveringHandler(command);
1038+
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
10431039
break;
10441040
default:
10451041
ESLog.Warn("RecoveryLoop state is out of range.");
@@ -1055,6 +1051,7 @@ private void MainRecoveryLoop()
10551051
{
10561052
ESLog.Error("Main recovery loop threw unexpected exception.", e);
10571053
}
1054+
10581055
m_recoveryLoopComplete.SetResult(0);
10591056
}
10601057

@@ -1075,7 +1072,7 @@ private void StopRecoveryLoop()
10751072
/// Handles commands when in the Recovering state.
10761073
/// </summary>
10771074
/// <param name="command"></param>
1078-
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
1075+
private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
10791076
{
10801077
switch (command)
10811078
{
@@ -1089,7 +1086,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
10891086
}
10901087
else
10911088
{
1092-
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
1089+
await Task.Delay(m_factory.NetworkRecoveryInterval);
1090+
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
10931091
}
10941092

10951093
break;
@@ -1103,7 +1101,7 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
11031101
/// Handles commands when in the Connected state.
11041102
/// </summary>
11051103
/// <param name="command"></param>
1106-
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
1104+
private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
11071105
{
11081106
switch (command)
11091107
{
@@ -1112,7 +1110,8 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
11121110
break;
11131111
case RecoveryCommand.BeginAutomaticRecovery:
11141112
m_recoveryLoopState = RecoveryConnectionState.Recovering;
1115-
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
1113+
await Task.Delay(m_factory.NetworkRecoveryInterval).ConfigureAwait(false);
1114+
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
11161115
break;
11171116
default:
11181117
ESLog.Warn($"RecoveryLoop command {command} is out of range.");

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
using System.Text;
5151
using System.Threading;
5252
using System.Reflection;
53+
using System.Threading.Tasks;
5354

5455
namespace RabbitMQ.Client.Framing.Impl
5556
{
@@ -101,6 +102,7 @@ public class Connection : IConnection
101102
private readonly object _heartBeatWriteLock = new object();
102103
private bool m_hasDisposedHeartBeatReadTimer;
103104
private bool m_hasDisposedHeartBeatWriteTimer;
105+
private Task _mainLoopTask;
104106

105107
private static string version = typeof(Connection).Assembly
106108
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
@@ -966,11 +968,7 @@ public void MaybeStartHeartbeatTimers()
966968

967969
public void StartMainLoop(bool useBackgroundThread)
968970
{
969-
var taskName = "AMQP Connection " + Endpoint;
970-
var mainLoopThread = new Thread(MainLoop);
971-
mainLoopThread.Name = taskName;
972-
mainLoopThread.IsBackground = useBackgroundThread;
973-
mainLoopThread.Start();
971+
_mainLoopTask = Task.Run(MainLoop);
974972
}
975973

976974
public void HeartbeatReadTimerCallback(object state)
@@ -1233,6 +1231,7 @@ private void Dispose(bool disposing)
12331231
// dispose managed resources
12341232
try
12351233
{
1234+
_mainLoopTask.Wait();
12361235
MaybeStopHeartbeatTimers();
12371236
Abort();
12381237
}

0 commit comments

Comments
 (0)