Skip to content

Commit d0f0e6c

Browse files
authored
Merge pull request #781 from danielmarbach/graceful
Graceful stop of worker services
2 parents ca93809 + b550728 commit d0f0e6c

File tree

7 files changed

+39
-34
lines changed

7 files changed

+39
-34
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading.Tasks;
23

34
namespace RabbitMQ.Client.Impl
45
{
@@ -19,9 +20,9 @@ public void Quiesce()
1920
IsShutdown = true;
2021
}
2122

22-
public void Shutdown(IModel model)
23+
public Task Shutdown(IModel model)
2324
{
24-
_workService.Stop(model);
25+
return _workService.Stop(model);
2526
}
2627

2728
public bool IsShutdown

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,14 @@ private WorkPool StartNewWorkPool(IModel model)
2020
return newWorkPool;
2121
}
2222

23-
public void Stop(IModel model)
23+
public Task Stop(IModel model)
2424
{
2525
if (_workPools.TryRemove(model, out WorkPool workPool))
2626
{
27-
workPool.Stop();
27+
return workPool.Stop();
2828
}
29-
}
3029

31-
public void Stop()
32-
{
33-
foreach (IModel model in _workPools.Keys)
34-
{
35-
Stop(model);
36-
}
30+
return Task.CompletedTask;
3731
}
3832

3933
class WorkPool
@@ -78,17 +72,18 @@ async Task Loop()
7872
// Swallowing the task cancellation in case we are stopping work.
7973
}
8074

81-
while (_tokenSource.IsCancellationRequested == false && _workQueue.TryDequeue(out Work work))
75+
while (_workQueue.TryDequeue(out Work work))
8276
{
8377
await work.Execute(_model).ConfigureAwait(false);
8478
}
8579
}
8680
}
8781

88-
public void Stop()
82+
public Task Stop()
8983
{
9084
_tokenSource.Cancel();
9185
_tokenRegistration.Dispose();
86+
return _worker;
9287
}
9388
}
9489
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
279279
{
280280
try
281281
{
282-
_delegate.Close(reason, abort);
282+
_delegate.Close(reason, abort).GetAwaiter().GetResult();;
283283
}
284284
finally
285285
{

projects/client/RabbitMQ.Client/src/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3-
3+
using System.Threading.Tasks;
44
using RabbitMQ.Client.Events;
55

66
namespace RabbitMQ.Client.Impl
@@ -22,9 +22,9 @@ public void Quiesce()
2222
IsShutdown = true;
2323
}
2424

25-
public void Shutdown(IModel model)
25+
public Task Shutdown(IModel model)
2626
{
27-
_workService.StopWork(model);
27+
return _workService.StopWorkAsync(model);
2828
}
2929

3030
public bool IsShutdown

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,27 @@ private WorkPool StartNewWorkPool(IModel model)
2121
return newWorkPool;
2222
}
2323

24-
public void StopWork(IModel model)
24+
public void StopWork()
2525
{
26-
if (_workPools.TryRemove(model, out WorkPool workPool))
26+
foreach (IModel model in _workPools.Keys)
2727
{
28-
workPool.Stop();
28+
StopWork(model);
2929
}
3030
}
3131

32-
public void StopWork()
32+
public void StopWork(IModel model)
3333
{
34-
foreach (IModel model in _workPools.Keys)
34+
StopWorkAsync(model).GetAwaiter().GetResult();
35+
}
36+
37+
internal Task StopWorkAsync(IModel model)
38+
{
39+
if (_workPools.TryRemove(model, out WorkPool workPool))
3540
{
36-
StopWork(model);
41+
return workPool.Stop();
3742
}
43+
44+
return Task.CompletedTask;
3845
}
3946

4047
class WorkPool
@@ -77,7 +84,7 @@ async Task Loop()
7784
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
7885
}
7986

80-
while (_tokenSource.IsCancellationRequested == false && _actions.TryDequeue(out Action action))
87+
while (_actions.TryDequeue(out Action action))
8188
{
8289
try
8390
{
@@ -91,10 +98,11 @@ async Task Loop()
9198
}
9299
}
93100

94-
public void Stop()
101+
public Task Stop()
95102
{
96103
_tokenSource.Cancel();
97104
_tokenRegistration.Dispose();
105+
return _worker;
98106
}
99107
}
100108
}

projects/client/RabbitMQ.Client/src/client/impl/IConsumerDispatcher.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Threading.Tasks;
4243

4344
namespace RabbitMQ.Client.Impl
4445
{
@@ -69,6 +70,6 @@ void HandleModelShutdown(IBasicConsumer consumer,
6970

7071
void Quiesce();
7172

72-
void Shutdown(IModel model);
73+
Task Shutdown(IModel model);
7374
}
7475
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
using System.Runtime.CompilerServices;
4646
using System.Text;
4747
using System.Threading;
48-
48+
using System.Threading.Tasks;
4949
using RabbitMQ.Client.Events;
5050
using RabbitMQ.Client.Exceptions;
5151
using RabbitMQ.Client.Framing;
@@ -184,14 +184,14 @@ public bool IsOpen
184184

185185
public ISession Session { get; private set; }
186186

187-
public void Close(ushort replyCode, string replyText, bool abort)
187+
public Task Close(ushort replyCode, string replyText, bool abort)
188188
{
189-
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
189+
return Close(new ShutdownEventArgs(ShutdownInitiator.Application,
190190
replyCode, replyText),
191191
abort);
192192
}
193193

194-
public void Close(ShutdownEventArgs reason, bool abort)
194+
public async Task Close(ShutdownEventArgs reason, bool abort)
195195
{
196196
var k = new ShutdownContinuation();
197197
ModelShutdown += k.OnConnectionShutdown;
@@ -204,7 +204,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
204204
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
205205
}
206206
k.Wait(TimeSpan.FromMilliseconds(10000));
207-
ConsumerDispatcher.Shutdown(this);
207+
await ConsumerDispatcher.Shutdown(this).ConfigureAwait(false);
208208
}
209209
catch (AlreadyClosedException)
210210
{
@@ -508,7 +508,7 @@ public void OnSessionShutdown(object sender, ShutdownEventArgs reason)
508508
SetCloseReason(reason);
509509
OnModelShutdown(reason);
510510
BroadcastShutdownToConsumers(m_consumers, reason);
511-
ConsumerDispatcher.Shutdown(this);
511+
ConsumerDispatcher.Shutdown(this).GetAwaiter().GetResult();;
512512
}
513513

514514
protected void BroadcastShutdownToConsumers(IDictionary<string, IBasicConsumer> cs, ShutdownEventArgs reason)
@@ -1394,7 +1394,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
13941394
Close(new ShutdownEventArgs(ShutdownInitiator.Application,
13951395
Constants.ReplySuccess,
13961396
"Nacks Received", new IOException("nack received")),
1397-
false);
1397+
false).GetAwaiter().GetResult();
13981398
throw new IOException("Nacks Received");
13991399
}
14001400
if (timedOut)
@@ -1403,7 +1403,7 @@ public void WaitForConfirmsOrDie(TimeSpan timeout)
14031403
Constants.ReplySuccess,
14041404
"Timed out waiting for acks",
14051405
new IOException("timed out waiting for acks")),
1406-
false);
1406+
false).GetAwaiter().GetResult();
14071407
throw new IOException("Timed out waiting for acks");
14081408
}
14091409
}

0 commit comments

Comments
 (0)