File tree Expand file tree Collapse file tree 1 file changed +13
-4
lines changed
projects/RabbitMQ.Client/client/impl Expand file tree Collapse file tree 1 file changed +13
-4
lines changed Original file line number Diff line number Diff line change @@ -9,20 +9,29 @@ namespace RabbitMQ.Client.Impl
9
9
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
10
10
{
11
11
private readonly ConcurrentDictionary < IModel , WorkPool > _workPools ;
12
- private readonly Func < IModel , WorkPool > _startNewWorkPoolAction ;
12
+ private readonly Func < IModel , WorkPool > _startNewWorkPoolFunc ;
13
13
14
14
public AsyncConsumerWorkService ( )
15
15
{
16
16
_workPools = new ConcurrentDictionary < IModel , WorkPool > ( ) ;
17
- _startNewWorkPoolAction = model => StartNewWorkPool ( model ) ;
17
+ _startNewWorkPoolFunc = model => StartNewWorkPool ( model ) ;
18
18
}
19
19
20
20
public void Schedule < TWork > ( ModelBase model , TWork work ) where TWork : Work
21
21
{
22
- _workPools . GetOrAdd ( model , _startNewWorkPoolAction ) . Enqueue ( work ) ;
22
+ /*
23
+ * rabbitmq/rabbitmq-dotnet-client#841
24
+ * https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.getoradd
25
+ *
26
+ * The lock is necessary because calling the value delegate is not atomic.
27
+ */
28
+ lock ( _workPools )
29
+ {
30
+ _workPools . GetOrAdd ( model , _startNewWorkPoolFunc ) . Enqueue ( work ) ;
31
+ }
23
32
}
24
33
25
- private WorkPool StartNewWorkPool ( IModel model )
34
+ private static WorkPool StartNewWorkPool ( IModel model )
26
35
{
27
36
var newWorkPool = new WorkPool ( model as ModelBase ) ;
28
37
newWorkPool . Start ( ) ;
You can’t perform that action at this time.
0 commit comments