Skip to content

Commit 5fc23a4

Browse files
committed
Refactor other use of ConcurrentDictionary GetOrAdd
1 parent 75ac0db commit 5fc23a4

File tree

2 files changed

+23
-11
lines changed

2 files changed

+23
-11
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,8 @@ namespace RabbitMQ.Client.Impl
88
{
99
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1010
{
11-
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools;
12-
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc;
13-
14-
public AsyncConsumerWorkService()
15-
{
16-
_workPools = new ConcurrentDictionary<IModel, WorkPool>();
17-
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
18-
}
11+
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
12+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
1913

2014
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2115
{
@@ -25,10 +19,14 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2519
*
2620
* The lock is necessary because calling the value delegate is not atomic.
2721
*/
22+
WorkPool workPool;
23+
2824
lock (_workPools)
2925
{
30-
_workPools.GetOrAdd(model, _startNewWorkPoolFunc).Enqueue(work);
26+
workPool = _workPools.GetOrAdd(model, _startNewWorkPoolFunc);
3127
}
28+
29+
workPool.Enqueue(work);
3230
}
3331

3432
private static WorkPool StartNewWorkPool(IModel model)

projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,27 @@ namespace RabbitMQ.Client.Impl
88
class ConsumerWorkService
99
{
1010
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
11+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
1112

1213
public void AddWork(IModel model, Action fn)
1314
{
14-
_workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(fn);
15+
/*
16+
* rabbitmq/rabbitmq-dotnet-client#841
17+
* https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd
18+
*
19+
* The lock is necessary because calling the value delegate is not atomic.
20+
*/
21+
WorkPool workPool;
22+
23+
lock (_workPools)
24+
{
25+
workPool = _workPools.GetOrAdd(model, _startNewWorkPoolFunc);
26+
}
27+
28+
workPool.Enqueue(fn);
1529
}
1630

17-
private WorkPool StartNewWorkPool(IModel model)
31+
private static WorkPool StartNewWorkPool(IModel model)
1832
{
1933
var newWorkPool = new WorkPool();
2034
newWorkPool.Start();

0 commit comments

Comments
 (0)