-
Notifications
You must be signed in to change notification settings - Fork 606
Task optimizations and deadlock fix #687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,69 +1,55 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using System; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using RabbitMQ.Client.Impl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using System.Collections.Concurrent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using System.Threading; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using System.Threading.Tasks; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
using RabbitMQ.Client.Impl; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
namespace RabbitMQ.Client | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
internal sealed class AsyncConsumerWorkService : ConsumerWorkService | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Schedule<TWork>(ModelBase model, TWork work) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
where TWork : Work | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// two step approach is taken, as TryGetValue does not aquire locks | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// if this fails, GetOrAdd is called, which takes a lock | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
WorkPool workPool; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (workPools.TryGetValue(model, out workPool) == false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var newWorkPool = new WorkPool(model); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workPool = workPools.GetOrAdd(model, newWorkPool); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// start if it's only the workpool that has been just created | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (newWorkPool == workPool) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
newWorkPool.Start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workPool.Enqueue(work); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private WorkPool StartNewWorkPool(IModel model) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var newWorkPool = new WorkPool(model as ModelBase); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
newWorkPool.Start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return newWorkPool; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public async Task Stop(IModel model) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Stop(IModel model) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
WorkPool workPool; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (workPools.TryRemove(model, out workPool)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (workPools.TryRemove(model, out WorkPool workPool)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await workPool.Stop().ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workPool.Stop(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public async Task Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
foreach (var model in workPools.Keys) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
foreach (IModel model in workPools.Keys) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await Stop(model).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Stop(model); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
class WorkPool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly ConcurrentQueue<Work> workQueue; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly TimeSpan waitTime; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly CancellationTokenSource tokenSource; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly ModelBase model; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TaskCompletionSource<bool> messageArrived; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private Task task; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public WorkPool(ModelBase model) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
this.model = model; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workQueue = new ConcurrentQueue<Work>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
messageArrived = new TaskCompletionSource<bool>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
waitTime = TimeSpan.FromMilliseconds(100); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokenSource = new CancellationTokenSource(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -75,30 +61,33 @@ public void Start() | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Enqueue(Work work) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
workQueue.Enqueue(work); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
messageArrived.TrySetResult(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
semaphore.Release(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
async Task Loop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
while (tokenSource.IsCancellationRequested == false) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Work work; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
while (workQueue.TryDequeue(out work)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
try | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await work.Execute(model).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
catch (TaskCanceledException) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Swallowing the task cancellation in case we are stopping work. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
messageArrived.TrySetResult(true); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
messageArrived = new TaskCompletionSource<bool>(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
-91
to
-93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no guarantee that Enqueue and Loop are working on the same messageArrived instances here. This is a lot easier to solve with SemaphoreSlim (Enqueue calls Release and increments it, releasing the await in Loop if it's at 0), and gets rid of the 100ms polling Task.Delay. Much cleaner and more efficient. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why I used a TCS here is because it is much faster than the semaphore version. The reason is that releasing the semaphore on the hot path vs TrySetReslut on the TCS is up to 7 to 8 times slower on NETFRAMEWORK and up to 3-4 times slower on NETCORE 3.1 Netframework without contention
Netframework with contention7
Netcore 3.1 with contention
Do you want me to use the TCS again and send a PR to fix this @michaelklishin @lukebakken ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please submit a PR. Thanks for the benchmarks. I assume that the original purpose of #687 (fixing a potential deadlock) will still apply. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is @stebet 's comment:
I don't know the specifics of the deadlock so I'm relying on your expertise! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think he talks about the TCS not using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we go #762 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
await work.Execute(model).ConfigureAwait(false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public Task Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public void Stop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokenSource.Cancel(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return task; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to return a task. Fixes the "Necessary Evil" above. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,11 +39,11 @@ | |
//--------------------------------------------------------------------------- | ||
|
||
using RabbitMQ.Client.Events; | ||
using RabbitMQ.Client.Exceptions; | ||
using RabbitMQ.Client.Impl; | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
@@ -93,7 +93,7 @@ public AutorecoveringConnection(ConnectionFactory factory, string clientProvided | |
m_factory = factory; | ||
this.ClientProvidedName = clientProvidedName; | ||
} | ||
|
||
public event EventHandler<EventArgs> RecoverySucceeded | ||
{ | ||
add | ||
|
@@ -551,12 +551,8 @@ private void Init(IFrameHandler fh) | |
{ | ||
m_delegate = new Connection(m_factory, false, | ||
fh, this.ClientProvidedName); | ||
|
||
m_recoveryThread = new Thread(MainRecoveryLoop) | ||
{ | ||
IsBackground = true | ||
}; | ||
m_recoveryThread.Start(); | ||
|
||
m_recoveryTask = Task.Run(MainRecoveryLoop); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Task instead of new Thread. Utilizes the ThreadPool and gets rid of the overhead to create a new thread, also enabled turning the MainRecoveryLoop method into an async method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The overhead for this method is unlikely to be significant enough in practice to matter but fair enough :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, but using Task.Run also enables the MainRecoveryLoop to be defined as "async Task" instead of being forced to be a synchronous method :) Also delegates to using the ThreadPool for better core utilization, hopefully resulting in fewer thread blocks/stalls. |
||
|
||
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) => | ||
{ | ||
|
@@ -587,7 +583,7 @@ public void UpdateSecret(string newSecret, string reason) | |
public void Abort() | ||
{ | ||
StopRecoveryLoop(); | ||
if(m_delegate.IsOpen) | ||
if (m_delegate.IsOpen) | ||
m_delegate.Abort(); | ||
} | ||
|
||
|
@@ -608,7 +604,7 @@ public void Abort(TimeSpan timeout) | |
m_delegate.Abort(timeout); | ||
} | ||
} | ||
|
||
///<summary>API-side invocation of connection abort with timeout.</summary> | ||
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout) | ||
{ | ||
|
@@ -980,7 +976,7 @@ private bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) | |
// connectivity loss or abrupt shutdown | ||
args.Initiator == ShutdownInitiator.Library); | ||
} | ||
|
||
private enum RecoveryCommand | ||
{ | ||
/// <summary> | ||
|
@@ -1007,7 +1003,7 @@ private enum RecoveryConnectionState | |
Recovering | ||
} | ||
|
||
private Thread m_recoveryThread; | ||
private Task m_recoveryTask; | ||
private RecoveryConnectionState m_recoveryLoopState = RecoveryConnectionState.Connected; | ||
|
||
private readonly BlockingCollection<RecoveryCommand> m_recoveryLoopCommandQueue = new BlockingCollection<RecoveryCommand>(); | ||
|
@@ -1017,7 +1013,7 @@ private enum RecoveryConnectionState | |
/// <summary> | ||
/// This is the main loop for the auto-recovery thread. | ||
/// </summary> | ||
private void MainRecoveryLoop() | ||
private async Task MainRecoveryLoop() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can now do async since we use Task.Run instead of new Thread. |
||
{ | ||
try | ||
{ | ||
|
@@ -1026,10 +1022,10 @@ private void MainRecoveryLoop() | |
switch (m_recoveryLoopState) | ||
{ | ||
case RecoveryConnectionState.Connected: | ||
RecoveryLoopConnectedHandler(command); | ||
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false); | ||
break; | ||
case RecoveryConnectionState.Recovering: | ||
RecoveryLoopRecoveringHandler(command); | ||
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false); | ||
break; | ||
default: | ||
ESLog.Warn("RecoveryLoop state is out of range."); | ||
|
@@ -1045,6 +1041,7 @@ private void MainRecoveryLoop() | |
{ | ||
ESLog.Error("Main recovery loop threw unexpected exception.", e); | ||
} | ||
|
||
m_recoveryLoopComplete.SetResult(0); | ||
} | ||
|
||
|
@@ -1065,7 +1062,7 @@ private void StopRecoveryLoop() | |
/// Handles commands when in the Recovering state. | ||
/// </summary> | ||
/// <param name="command"></param> | ||
private void RecoveryLoopRecoveringHandler(RecoveryCommand command) | ||
private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command) | ||
{ | ||
switch (command) | ||
{ | ||
|
@@ -1079,7 +1076,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command) | |
} | ||
else | ||
{ | ||
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This had no guarantee of working correctly, or ever even calling the continuation, since the result from Task.Delay was never awaited either synchronously or asynchronously. Fixed by using async. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch! |
||
await Task.Delay(m_factory.NetworkRecoveryInterval); | ||
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); | ||
} | ||
|
||
break; | ||
|
@@ -1093,7 +1091,7 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command) | |
/// Handles commands when in the Connected state. | ||
/// </summary> | ||
/// <param name="command"></param> | ||
private void RecoveryLoopConnectedHandler(RecoveryCommand command) | ||
private async Task RecoveryLoopConnectedHandler(RecoveryCommand command) | ||
{ | ||
switch (command) | ||
{ | ||
|
@@ -1102,7 +1100,8 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command) | |
break; | ||
case RecoveryCommand.BeginAutomaticRecovery: | ||
m_recoveryLoopState = RecoveryConnectionState.Recovering; | ||
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); }); | ||
await Task.Delay(m_factory.NetworkRecoveryInterval).ConfigureAwait(false); | ||
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); | ||
break; | ||
default: | ||
ESLog.Warn($"RecoveryLoop command {command} is out of range."); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not at all performance sensitive code, so it's much more readable like this, but will revert if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable.