Skip to content

Graceful stop of worker services #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
Expand All @@ -19,9 +20,9 @@ public void Quiesce()
IsShutdown = true;
}

public void Shutdown(IModel model)
public Task Shutdown(IModel model)
{
_workService.Stop(model);
return _workService.Stop(model);
}

public bool IsShutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,14 @@ private WorkPool StartNewWorkPool(IModel model)
return newWorkPool;
}

public void Stop(IModel model)
public Task Stop(IModel model)
{
if (_workPools.TryRemove(model, out WorkPool workPool))
{
workPool.Stop();
return workPool.Stop();
}
}

public void Stop()
{
foreach (IModel model in _workPools.Keys)
{
Stop(model);
}
return Task.CompletedTask;
}

class WorkPool
Expand Down Expand Up @@ -78,17 +72,18 @@ async Task Loop()
// Swallowing the task cancellation in case we are stopping work.
}

while (_tokenSource.IsCancellationRequested == false && _workQueue.TryDequeue(out Work work))
while (_workQueue.TryDequeue(out Work work))
{
await work.Execute(_model).ConfigureAwait(false);
}
}
}

public void Stop()
public Task Stop()
{
_tokenSource.Cancel();
_tokenRegistration.Dispose();
return _worker;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
{
try
{
_delegate.Close(reason, abort);
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.Collections.Generic;

using System.Threading.Tasks;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client.Impl
Expand All @@ -22,9 +22,9 @@ public void Quiesce()
IsShutdown = true;
}

public void Shutdown(IModel model)
public Task Shutdown(IModel model)
{
_workService.StopWork(model);
return _workService.StopWorkAsync(model);
}

public bool IsShutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,27 @@ private WorkPool StartNewWorkPool(IModel model)
return newWorkPool;
}

public void StopWork(IModel model)
public void StopWork()
{
if (_workPools.TryRemove(model, out WorkPool workPool))
foreach (IModel model in _workPools.Keys)
{
workPool.Stop();
StopWork(model);
}
}

public void StopWork()
public void StopWork(IModel model)
{
foreach (IModel model in _workPools.Keys)
StopWorkAsync(model).GetAwaiter().GetResult();
}

internal Task StopWorkAsync(IModel model)
{
if (_workPools.TryRemove(model, out WorkPool workPool))
{
StopWork(model);
return workPool.Stop();
}

return Task.CompletedTask;
}

class WorkPool
Expand Down Expand Up @@ -77,7 +84,7 @@ async Task Loop()
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
}

while (_tokenSource.IsCancellationRequested == false && _actions.TryDequeue(out Action action))
while (_actions.TryDequeue(out Action action))
{
try
{
Expand All @@ -91,10 +98,11 @@ async Task Loop()
}
}

public void Stop()
public Task Stop()
{
_tokenSource.Cancel();
_tokenRegistration.Dispose();
return _worker;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//---------------------------------------------------------------------------

using System;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
{
Expand Down Expand Up @@ -69,6 +70,6 @@ void HandleModelShutdown(IBasicConsumer consumer,

void Quiesce();

void Shutdown(IModel model);
Task Shutdown(IModel model);
}
}
16 changes: 8 additions & 8 deletions projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;

using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing;
Expand Down Expand Up @@ -184,14 +184,14 @@ public bool IsOpen

public ISession Session { get; private set; }

public void Close(ushort replyCode, string replyText, bool abort)
public Task Close(ushort replyCode, string replyText, bool abort)
{
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
return Close(new ShutdownEventArgs(ShutdownInitiator.Application,
replyCode, replyText),
abort);
}

public void Close(ShutdownEventArgs reason, bool abort)
public async Task Close(ShutdownEventArgs reason, bool abort)
{
var k = new ShutdownContinuation();
ModelShutdown += k.OnConnectionShutdown;
Expand All @@ -204,7 +204,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
}
k.Wait(TimeSpan.FromMilliseconds(10000));
ConsumerDispatcher.Shutdown(this);
await ConsumerDispatcher.Shutdown(this).ConfigureAwait(false);
}
catch (AlreadyClosedException)
{
Expand Down Expand Up @@ -508,7 +508,7 @@ public void OnSessionShutdown(object sender, ShutdownEventArgs reason)
SetCloseReason(reason);
OnModelShutdown(reason);
BroadcastShutdownToConsumers(m_consumers, reason);
ConsumerDispatcher.Shutdown(this);
ConsumerDispatcher.Shutdown(this).GetAwaiter().GetResult();;
}

protected void BroadcastShutdownToConsumers(IDictionary<string, IBasicConsumer> cs, ShutdownEventArgs reason)
Expand Down Expand Up @@ -1394,7 +1394,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
Constants.ReplySuccess,
"Nacks Received", new IOException("nack received")),
false);
false).GetAwaiter().GetResult();
throw new IOException("Nacks Received");
}
if (timedOut)
Expand All @@ -1403,7 +1403,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
Constants.ReplySuccess,
"Timed out waiting for acks",
new IOException("timed out waiting for acks")),
false);
false).GetAwaiter().GetResult();
throw new IOException("Timed out waiting for acks");
}
}
Expand Down