Skip to content

Commit 66d5966

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Using the TPL instead of dedicated threads. Results in a much more stable connection and consumers so far.
Code cleanups. Removing unneeded dependency.
1 parent 6b14316 commit 66d5966

11 files changed

+168
-235
lines changed

RabbitMQDotNetClient.sln

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
2-
# Visual Studio 15
3-
VisualStudioVersion = 15.0.26114.2
2+
# Visual Studio Version 16
3+
VisualStudioVersion = 16.0.29709.97
44
MinimumVisualStudioVersion = 10.0.40219.1
55
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "projects", "projects", "{068D7DC3-8E6E-4951-B9E3-272C641BF0DE}"
66
EndProject

projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,4 @@
6464
</AssemblyAttribute>
6565
</ItemGroup>
6666

67-
</Project>
67+
</Project>

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,12 @@ public void Quiesce()
1919

2020
public void Shutdown()
2121
{
22-
// necessary evil
23-
this.workService.Stop().GetAwaiter().GetResult();
22+
workService.Stop();
2423
}
2524

2625
public void Shutdown(IModel model)
2726
{
28-
// necessary evil
29-
this.workService.Stop(model).GetAwaiter().GetResult();
27+
workService.Stop(model);
3028
}
3129

3230
public bool IsShutdown
@@ -66,10 +64,10 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
6664
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
6765
{
6866
// the only case where we ignore the shutdown flag.
69-
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
67+
new ModelShutdown(consumer, reason).Execute(model).GetAwaiter().GetResult();
7068
}
7169

72-
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
70+
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
7371
where TWork : Work
7472
{
7573
if (!this.IsShutdown)
Lines changed: 31 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,55 @@
1-
using System;
1+
using RabbitMQ.Client.Impl;
2+
23
using System.Collections.Concurrent;
34
using System.Threading;
45
using System.Threading.Tasks;
5-
using RabbitMQ.Client.Impl;
66

77
namespace RabbitMQ.Client
88
{
99
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1010
{
11-
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
11+
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1212

13-
public void Schedule<TWork>(ModelBase model, TWork work)
14-
where TWork : Work
13+
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1514
{
16-
// two step approach is taken, as TryGetValue does not aquire locks
17-
// if this fails, GetOrAdd is called, which takes a lock
18-
19-
WorkPool workPool;
20-
if (workPools.TryGetValue(model, out workPool) == false)
21-
{
22-
var newWorkPool = new WorkPool(model);
23-
workPool = workPools.GetOrAdd(model, newWorkPool);
24-
25-
// start if it's only the workpool that has been just created
26-
if (newWorkPool == workPool)
27-
{
28-
newWorkPool.Start();
29-
}
30-
}
15+
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
16+
}
3117

32-
workPool.Enqueue(work);
18+
private WorkPool StartNewWorkPool(IModel model)
19+
{
20+
var newWorkPool = new WorkPool(model as ModelBase);
21+
newWorkPool.Start();
22+
return newWorkPool;
3323
}
3424

35-
public async Task Stop(IModel model)
25+
public void Stop(IModel model)
3626
{
37-
WorkPool workPool;
38-
if (workPools.TryRemove(model, out workPool))
27+
if (workPools.TryRemove(model, out WorkPool workPool))
3928
{
40-
await workPool.Stop().ConfigureAwait(false);
29+
workPool.Stop();
4130
}
4231
}
4332

44-
public async Task Stop()
33+
public void Stop()
4534
{
46-
foreach (var model in workPools.Keys)
35+
foreach (IModel model in workPools.Keys)
4736
{
48-
await Stop(model).ConfigureAwait(false);
37+
Stop(model);
4938
}
5039
}
5140

5241
class WorkPool
5342
{
5443
readonly ConcurrentQueue<Work> workQueue;
55-
readonly TimeSpan waitTime;
5644
readonly CancellationTokenSource tokenSource;
5745
readonly ModelBase model;
58-
TaskCompletionSource<bool> messageArrived;
46+
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
5947
private Task task;
6048

6149
public WorkPool(ModelBase model)
6250
{
6351
this.model = model;
6452
workQueue = new ConcurrentQueue<Work>();
65-
messageArrived = new TaskCompletionSource<bool>();
66-
waitTime = TimeSpan.FromMilliseconds(100);
6753
tokenSource = new CancellationTokenSource();
6854
}
6955

@@ -75,30 +61,33 @@ public void Start()
7561
public void Enqueue(Work work)
7662
{
7763
workQueue.Enqueue(work);
78-
messageArrived.TrySetResult(true);
64+
semaphore.Release();
7965
}
8066

8167
async Task Loop()
8268
{
8369
while (tokenSource.IsCancellationRequested == false)
8470
{
85-
Work work;
86-
while (workQueue.TryDequeue(out work))
71+
try
8772
{
88-
await work.Execute(model).ConfigureAwait(false);
73+
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
74+
}
75+
catch (TaskCanceledException)
76+
{
77+
// Swallowing the task cancellation in case we are stopping work.
8978
}
9079

91-
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
92-
messageArrived.TrySetResult(true);
93-
messageArrived = new TaskCompletionSource<bool>();
80+
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
81+
{
82+
await work.Execute(model).ConfigureAwait(false);
83+
}
9484
}
9585
}
9686

97-
public Task Stop()
87+
public void Stop()
9888
{
9989
tokenSource.Cancel();
100-
return task;
10190
}
10291
}
10392
}
104-
}
93+
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@
3939
//---------------------------------------------------------------------------
4040

4141
using RabbitMQ.Client.Events;
42-
using RabbitMQ.Client.Exceptions;
4342
using RabbitMQ.Client.Impl;
43+
4444
using System;
45-
using System.Collections.Generic;
4645
using System.Collections.Concurrent;
46+
using System.Collections.Generic;
4747
using System.Linq;
4848
using System.Threading;
4949
using System.Threading.Tasks;
@@ -93,7 +93,7 @@ public AutorecoveringConnection(ConnectionFactory factory, string clientProvided
9393
m_factory = factory;
9494
this.ClientProvidedName = clientProvidedName;
9595
}
96-
96+
9797
public event EventHandler<EventArgs> RecoverySucceeded
9898
{
9999
add
@@ -551,12 +551,8 @@ private void Init(IFrameHandler fh)
551551
{
552552
m_delegate = new Connection(m_factory, false,
553553
fh, this.ClientProvidedName);
554-
555-
m_recoveryThread = new Thread(MainRecoveryLoop)
556-
{
557-
IsBackground = true
558-
};
559-
m_recoveryThread.Start();
554+
555+
m_recoveryTask = Task.Run(MainRecoveryLoop);
560556

561557
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
562558
{
@@ -587,7 +583,7 @@ public void UpdateSecret(string newSecret, string reason)
587583
public void Abort()
588584
{
589585
StopRecoveryLoop();
590-
if(m_delegate.IsOpen)
586+
if (m_delegate.IsOpen)
591587
m_delegate.Abort();
592588
}
593589

@@ -606,7 +602,7 @@ public void Abort(int timeout)
606602
if (m_delegate.IsOpen)
607603
m_delegate.Abort(timeout);
608604
}
609-
605+
610606
///<summary>API-side invocation of connection abort with timeout.</summary>
611607
public void Abort(TimeSpan timeout)
612608
{
@@ -622,7 +618,7 @@ public void Abort(ushort reasonCode, string reasonText, int timeout)
622618
if (m_delegate.IsOpen)
623619
m_delegate.Abort(reasonCode, reasonText, timeout);
624620
}
625-
621+
626622
///<summary>API-side invocation of connection abort with timeout.</summary>
627623
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
628624
{
@@ -990,7 +986,7 @@ private bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
990986
// connectivity loss or abrupt shutdown
991987
args.Initiator == ShutdownInitiator.Library);
992988
}
993-
989+
994990
private enum RecoveryCommand
995991
{
996992
/// <summary>
@@ -1017,7 +1013,7 @@ private enum RecoveryConnectionState
10171013
Recovering
10181014
}
10191015

1020-
private Thread m_recoveryThread;
1016+
private Task m_recoveryTask;
10211017
private RecoveryConnectionState m_recoveryLoopState = RecoveryConnectionState.Connected;
10221018

10231019
private readonly BlockingCollection<RecoveryCommand> m_recoveryLoopCommandQueue = new BlockingCollection<RecoveryCommand>();
@@ -1027,7 +1023,7 @@ private enum RecoveryConnectionState
10271023
/// <summary>
10281024
/// This is the main loop for the auto-recovery thread.
10291025
/// </summary>
1030-
private void MainRecoveryLoop()
1026+
private async Task MainRecoveryLoop()
10311027
{
10321028
try
10331029
{
@@ -1036,10 +1032,10 @@ private void MainRecoveryLoop()
10361032
switch (m_recoveryLoopState)
10371033
{
10381034
case RecoveryConnectionState.Connected:
1039-
RecoveryLoopConnectedHandler(command);
1035+
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
10401036
break;
10411037
case RecoveryConnectionState.Recovering:
1042-
RecoveryLoopRecoveringHandler(command);
1038+
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
10431039
break;
10441040
default:
10451041
ESLog.Warn("RecoveryLoop state is out of range.");
@@ -1055,6 +1051,7 @@ private void MainRecoveryLoop()
10551051
{
10561052
ESLog.Error("Main recovery loop threw unexpected exception.", e);
10571053
}
1054+
10581055
m_recoveryLoopComplete.SetResult(0);
10591056
}
10601057

@@ -1075,7 +1072,7 @@ private void StopRecoveryLoop()
10751072
/// Handles commands when in the Recovering state.
10761073
/// </summary>
10771074
/// <param name="command"></param>
1078-
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
1075+
private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
10791076
{
10801077
switch (command)
10811078
{
@@ -1089,7 +1086,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
10891086
}
10901087
else
10911088
{
1092-
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
1089+
await Task.Delay(m_factory.NetworkRecoveryInterval);
1090+
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
10931091
}
10941092

10951093
break;
@@ -1103,7 +1101,7 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
11031101
/// Handles commands when in the Connected state.
11041102
/// </summary>
11051103
/// <param name="command"></param>
1106-
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
1104+
private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
11071105
{
11081106
switch (command)
11091107
{
@@ -1112,7 +1110,8 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
11121110
break;
11131111
case RecoveryCommand.BeginAutomaticRecovery:
11141112
m_recoveryLoopState = RecoveryConnectionState.Recovering;
1115-
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
1113+
await Task.Delay(m_factory.NetworkRecoveryInterval).ConfigureAwait(false);
1114+
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
11161115
break;
11171116
default:
11181117
ESLog.Warn($"RecoveryLoop command {command} is out of range.");

0 commit comments

Comments
 (0)