Skip to content

Commit 3ce5e10

Browse files
committed
Getting rid of closure allocation in AsyncConsumerDispatcher
1 parent c393768 commit 3ce5e10

File tree

8 files changed

+250
-112
lines changed

8 files changed

+250
-112
lines changed
Lines changed: 12 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Threading.Tasks;
4-
using RabbitMQ.Client.Events;
5-
6-
namespace RabbitMQ.Client.Impl
1+
namespace RabbitMQ.Client.Impl
72
{
83
internal class AsyncConsumerDispatcher : IConsumerDispatcher
94
{
10-
private ModelBase model;
11-
private AsyncConsumerWorkService workService;
5+
private readonly ModelBase model;
6+
private readonly AsyncConsumerWorkService workService;
127

138
public AsyncConsumerDispatcher(ModelBase model, AsyncConsumerWorkService ws)
149
{
@@ -43,22 +38,7 @@ public bool IsShutdown
4338
public void HandleBasicConsumeOk(IBasicConsumer consumer,
4439
string consumerTag)
4540
{
46-
UnlessShuttingDown(async () =>
47-
{
48-
try
49-
{
50-
await ((IAsyncBasicConsumer)consumer).HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
51-
}
52-
catch (Exception e)
53-
{
54-
var details = new Dictionary<string, object>()
55-
{
56-
{"consumer", consumer},
57-
{"context", "HandleBasicConsumeOk"}
58-
};
59-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
60-
}
61-
});
41+
ScheduleUnlessShuttingDown(new BasicConsumeOk(consumer, consumerTag));
6242
}
6343

6444
public void HandleBasicDeliver(IBasicConsumer consumer,
@@ -70,99 +50,36 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
7050
IBasicProperties basicProperties,
7151
byte[] body)
7252
{
73-
UnlessShuttingDown(async () =>
74-
{
75-
try
76-
{
77-
await ((IAsyncBasicConsumer)consumer).HandleBasicDeliver(consumerTag,
78-
deliveryTag,
79-
redelivered,
80-
exchange,
81-
routingKey,
82-
basicProperties,
83-
body).ConfigureAwait(false);
84-
}
85-
catch (Exception e)
86-
{
87-
var details = new Dictionary<string, object>()
88-
{
89-
{"consumer", consumer},
90-
{"context", "HandleBasicDeliver"}
91-
};
92-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
93-
}
94-
});
53+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body));
9554
}
9655

9756
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
9857
{
99-
UnlessShuttingDown(async () =>
100-
{
101-
try
102-
{
103-
await ((IAsyncBasicConsumer)consumer).HandleBasicCancelOk(consumerTag);
104-
}
105-
catch (Exception e)
106-
{
107-
var details = new Dictionary<string, object>()
108-
{
109-
{"consumer", consumer},
110-
{"context", "HandleBasicCancelOk"}
111-
};
112-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
113-
}
114-
});
58+
ScheduleUnlessShuttingDown(new BasicCancelOk(consumer, consumerTag));
11559
}
11660

11761
public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
11862
{
119-
UnlessShuttingDown(async () =>
120-
{
121-
try
122-
{
123-
await ((IAsyncBasicConsumer)consumer).HandleBasicCancel(consumerTag).ConfigureAwait(false);
124-
}
125-
catch (Exception e)
126-
{
127-
var details = new Dictionary<string, object>()
128-
{
129-
{"consumer", consumer},
130-
{"context", "HandleBasicCancel"}
131-
};
132-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
133-
}
134-
});
63+
ScheduleUnlessShuttingDown(new BasicCancel(consumer, consumerTag));
13564
}
13665

13766
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
13867
{
13968
// the only case where we ignore the shutdown flag.
140-
try
141-
{
142-
((IAsyncBasicConsumer)consumer).HandleModelShutdown(model, reason).GetAwaiter().GetResult();
143-
}
144-
catch (Exception e)
145-
{
146-
var details = new Dictionary<string, object>()
147-
{
148-
{"consumer", consumer},
149-
{"context", "HandleModelShutdown"}
150-
};
151-
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
152-
};
69+
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
15370
}
15471

155-
private void UnlessShuttingDown(Func<Task> fn)
72+
private void ScheduleUnlessShuttingDown(Work work)
15673
{
15774
if (!this.IsShutdown)
15875
{
159-
Execute(fn);
76+
Schedule(work);
16077
}
16178
}
16279

163-
private void Execute(Func<Task> fn)
80+
private void Schedule(Work work)
16481
{
165-
this.workService.AddWork(this.model, fn);
82+
this.workService.Schedule(this.model, work);
16683
}
16784
}
16885
}

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,23 @@
22
using System.Collections.Concurrent;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using RabbitMQ.Client.Impl;
56

67
namespace RabbitMQ.Client
78
{
89
internal class AsyncConsumerWorkService
910
{
1011
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1112

12-
public void AddWork(IModel model, Func<Task> fn)
13+
public void Schedule(ModelBase model, Work work)
1314
{
1415
// two step approach is taken, as TryGetValue does not aquire locks
1516
// if this fails, GetOrAdd is called, which takes a lock
1617

1718
WorkPool workPool;
1819
if (workPools.TryGetValue(model, out workPool) == false)
1920
{
20-
var newWorkPool = new WorkPool();
21+
var newWorkPool = new WorkPool(model);
2122
workPool = workPools.GetOrAdd(model, newWorkPool);
2223

2324
// start if it's only the workpool that has been just created
@@ -27,7 +28,7 @@ public void AddWork(IModel model, Func<Task> fn)
2728
}
2829
}
2930

30-
workPool.Enqueue(fn);
31+
workPool.Enqueue(work);
3132
}
3233

3334
public async Task StopWork(IModel model)
@@ -49,15 +50,17 @@ public async Task StopWork()
4950

5051
class WorkPool
5152
{
52-
readonly ConcurrentQueue<Func<Task>> actions;
53+
readonly ConcurrentQueue<Work> workQueue;
5354
readonly TimeSpan waitTime;
5455
readonly CancellationTokenSource tokenSource;
56+
readonly ModelBase model;
5557
TaskCompletionSource<bool> messageArrived;
5658
private Task task;
5759

58-
public WorkPool()
60+
public WorkPool(ModelBase model)
5961
{
60-
actions = new ConcurrentQueue<Func<Task>>();
62+
this.model = model;
63+
workQueue = new ConcurrentQueue<Work>();
6164
messageArrived = new TaskCompletionSource<bool>();
6265
waitTime = TimeSpan.FromMilliseconds(100);
6366
tokenSource = new CancellationTokenSource();
@@ -68,26 +71,20 @@ public void Start()
6871
task = Task.Run(Loop, CancellationToken.None);
6972
}
7073

71-
public void Enqueue(Func<Task> action)
74+
public void Enqueue(Work work)
7275
{
73-
actions.Enqueue(action);
76+
workQueue.Enqueue(work);
7477
messageArrived.TrySetResult(true);
7578
}
7679

7780
async Task Loop()
7881
{
7982
while (tokenSource.IsCancellationRequested == false)
8083
{
81-
Func<Task> action;
82-
while (actions.TryDequeue(out action))
84+
Work work;
85+
while (workQueue.TryDequeue(out work))
8386
{
84-
try
85-
{
86-
await action().ConfigureAwait(false);
87-
}
88-
catch (Exception)
89-
{
90-
}
87+
await work.Execute(model).ConfigureAwait(false);
9188
}
9289

9390
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using RabbitMQ.Client.Events;
5+
6+
namespace RabbitMQ.Client.Impl
7+
{
8+
sealed class BasicCancel : Work
9+
{
10+
readonly string consumerTag;
11+
12+
public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
13+
{
14+
this.consumerTag = consumerTag;
15+
}
16+
17+
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
{
19+
try
20+
{
21+
await consumer.HandleBasicCancel(consumerTag).ConfigureAwait(false);
22+
}
23+
catch (Exception e)
24+
{
25+
var details = new Dictionary<string, object>
26+
{
27+
{"consumer", consumer},
28+
{"context", "HandleBasicCancel"}
29+
};
30+
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
31+
}
32+
}
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using RabbitMQ.Client.Events;
5+
6+
namespace RabbitMQ.Client.Impl
7+
{
8+
sealed class BasicCancelOk : Work
9+
{
10+
readonly string consumerTag;
11+
12+
public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
13+
{
14+
this.consumerTag = consumerTag;
15+
}
16+
17+
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
{
19+
try
20+
{
21+
await consumer.HandleBasicCancelOk(consumerTag).ConfigureAwait(false);
22+
}
23+
catch (Exception e)
24+
{
25+
var details = new Dictionary<string, object>()
26+
{
27+
{"consumer", consumer},
28+
{"context", "HandleBasicCancelOk"}
29+
};
30+
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
31+
}
32+
}
33+
}
34+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using RabbitMQ.Client.Events;
5+
6+
namespace RabbitMQ.Client.Impl
7+
{
8+
sealed class BasicConsumeOk : Work
9+
{
10+
readonly string consumerTag;
11+
12+
public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
13+
{
14+
this.consumerTag = consumerTag;
15+
}
16+
17+
protected override async Task Execute(ModelBase model, IAsyncBasicConsumer consumer)
18+
{
19+
try
20+
{
21+
await consumer.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
22+
}
23+
catch (Exception e)
24+
{
25+
var details = new Dictionary<string, object>()
26+
{
27+
{"consumer", consumer},
28+
{"context", "HandleBasicConsumeOk"}
29+
};
30+
model.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
31+
}
32+
}
33+
}
34+
}

0 commit comments

Comments
 (0)