Skip to content

Commit f1af6ac

Browse files
committed
simplify work error handling
1 parent c3cd117 commit f1af6ac

File tree

8 files changed

+95
-155
lines changed

8 files changed

+95
-155
lines changed

projects/RabbitMQ.Client/client/events/AsyncEventingBasicConsumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public override async Task HandleBasicConsumeOk(string consumerTag)
4545
}
4646

4747
///<summary>Fires the Received event.</summary>
48-
public override async Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
48+
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
4949
{
50-
await base.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body).ConfigureAwait(false);
51-
await Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body)).ConfigureAwait(false);
50+
// No need to call base, it's empty.
51+
return Received.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
5252
}
5353

5454
///<summary>Fires the Shutdown event.</summary>

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
34
using System.Threading;
45
using System.Threading.Channels;
56
using System.Threading.Tasks;
7+
using RabbitMQ.Client.Events;
68

79
namespace RabbitMQ.Client.Impl
810
{
@@ -79,7 +81,7 @@ public void Enqueue(Work work)
7981
_channel.Writer.TryWrite(work);
8082
}
8183

82-
async Task Loop()
84+
private async Task Loop()
8385
{
8486
while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false))
8587
{
@@ -93,15 +95,29 @@ async Task Loop()
9395
await task.ConfigureAwait(false);
9496
}
9597
}
96-
catch(Exception)
98+
catch (Exception e)
9799
{
100+
if (!(_model is ModelBase modelBase))
101+
{
102+
return;
103+
}
98104

105+
var details = new Dictionary<string, object>
106+
{
107+
{ "consumer", work.Consumer },
108+
{ "context", work.Consumer }
109+
};
110+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
111+
}
112+
finally
113+
{
114+
work.PostExecute();
99115
}
100116
}
101117
}
102118
}
103119

104-
async Task LoopWithConcurrency(CancellationToken cancellationToken)
120+
private async Task LoopWithConcurrency(CancellationToken cancellationToken)
105121
{
106122
try
107123
{
@@ -125,7 +141,7 @@ async Task LoopWithConcurrency(CancellationToken cancellationToken)
125141
}
126142
}
127143

128-
static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
144+
private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limiter)
129145
{
130146
try
131147
{
@@ -135,12 +151,23 @@ static async Task HandleConcurrent(Work work, IModel model, SemaphoreSlim limite
135151
await task.ConfigureAwait(false);
136152
}
137153
}
138-
catch (Exception)
154+
catch (Exception e)
139155
{
140-
// ignored
156+
if (!(model is ModelBase modelBase))
157+
{
158+
return;
159+
}
160+
161+
var details = new Dictionary<string, object>
162+
{
163+
{ "consumer", work.Consumer },
164+
{ "context", work.Consumer }
165+
};
166+
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
141167
}
142168
finally
143169
{
170+
work.PostExecute();
144171
limiter.Release();
145172
}
146173
}
Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,21 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Threading.Tasks;
4-
5-
using RabbitMQ.Client.Events;
1+
using System.Threading.Tasks;
62

73
namespace RabbitMQ.Client.Impl
84
{
9-
sealed class BasicCancel : Work
5+
internal sealed class BasicCancel : Work
106
{
11-
readonly string _consumerTag;
7+
private readonly string _consumerTag;
8+
9+
public override string Context => "HandleBasicCancel";
1210

1311
public BasicCancel(IBasicConsumer consumer, string consumerTag) : base(consumer)
1412
{
1513
_consumerTag = consumerTag;
1614
}
1715

18-
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
16+
protected override Task Execute(IModel model, IAsyncBasicConsumer consumer)
1917
{
20-
try
21-
{
22-
await consumer.HandleBasicCancel(_consumerTag).ConfigureAwait(false);
23-
}
24-
catch (Exception e)
25-
{
26-
if (!(model is ModelBase modelBase))
27-
{
28-
return;
29-
}
30-
31-
var details = new Dictionary<string, object>
32-
{
33-
{"consumer", consumer},
34-
{"context", "HandleBasicCancel"}
35-
};
36-
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
37-
}
18+
return consumer.HandleBasicCancel(_consumerTag);
3819
}
3920
}
4021
}
Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,21 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Threading.Tasks;
4-
5-
using RabbitMQ.Client.Events;
1+
using System.Threading.Tasks;
62

73
namespace RabbitMQ.Client.Impl
84
{
9-
sealed class BasicCancelOk : Work
5+
internal sealed class BasicCancelOk : Work
106
{
11-
readonly string _consumerTag;
7+
private readonly string _consumerTag;
8+
9+
public override string Context => "HandleBasicCancelOk";
1210

1311
public BasicCancelOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
1412
{
1513
_consumerTag = consumerTag;
1614
}
1715

18-
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
16+
protected override Task Execute(IModel model, IAsyncBasicConsumer consumer)
1917
{
20-
try
21-
{
22-
await consumer.HandleBasicCancelOk(_consumerTag).ConfigureAwait(false);
23-
}
24-
catch (Exception e)
25-
{
26-
if (!(model is ModelBase modelBase))
27-
{
28-
return;
29-
}
30-
31-
var details = new Dictionary<string, object>()
32-
{
33-
{"consumer", consumer},
34-
{"context", "HandleBasicCancelOk"}
35-
};
36-
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
37-
}
18+
return consumer.HandleBasicCancelOk(_consumerTag);
3819
}
3920
}
4021
}
Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,21 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Threading.Tasks;
4-
5-
using RabbitMQ.Client.Events;
1+
using System.Threading.Tasks;
62

73
namespace RabbitMQ.Client.Impl
84
{
9-
sealed class BasicConsumeOk : Work
5+
internal sealed class BasicConsumeOk : Work
106
{
11-
readonly string _consumerTag;
7+
private readonly string _consumerTag;
8+
9+
public override string Context => "HandleBasicConsumeOk";
1210

1311
public BasicConsumeOk(IBasicConsumer consumer, string consumerTag) : base(consumer)
1412
{
1513
_consumerTag = consumerTag;
1614
}
1715

18-
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
16+
protected override Task Execute(IModel model, IAsyncBasicConsumer consumer)
1917
{
20-
try
21-
{
22-
await consumer.HandleBasicConsumeOk(_consumerTag).ConfigureAwait(false);
23-
}
24-
catch (Exception e)
25-
{
26-
if (!(model is ModelBase modelBase))
27-
{
28-
return;
29-
}
30-
31-
var details = new Dictionary<string, object>()
32-
{
33-
{"consumer", consumer},
34-
{"context", "HandleBasicConsumeOk"}
35-
};
36-
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
37-
}
18+
return consumer.HandleBasicConsumeOk(_consumerTag);
3819
}
3920
}
4021
}

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
using System;
22
using System.Buffers;
3-
using System.Collections.Generic;
43
using System.Runtime.InteropServices;
54
using System.Threading.Tasks;
65

7-
using RabbitMQ.Client.Events;
8-
96
namespace RabbitMQ.Client.Impl
107
{
11-
sealed class BasicDeliver : Work
8+
internal sealed class BasicDeliver : Work
129
{
13-
readonly string _consumerTag;
14-
readonly ulong _deliveryTag;
15-
readonly bool _redelivered;
16-
readonly string _exchange;
17-
readonly string _routingKey;
18-
readonly IBasicProperties _basicProperties;
19-
readonly ReadOnlyMemory<byte> _body;
10+
private readonly string _consumerTag;
11+
private readonly ulong _deliveryTag;
12+
private readonly bool _redelivered;
13+
private readonly string _exchange;
14+
private readonly string _routingKey;
15+
private readonly IBasicProperties _basicProperties;
16+
private readonly ReadOnlyMemory<byte> _body;
17+
18+
public override string Context => "HandleBasicDeliver";
2019

2120
public BasicDeliver(IBasicConsumer consumer,
2221
string consumerTag,
@@ -36,38 +35,22 @@ public BasicDeliver(IBasicConsumer consumer,
3635
_body = body;
3736
}
3837

39-
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
38+
protected override Task Execute(IModel model, IAsyncBasicConsumer consumer)
4039
{
41-
try
42-
{
43-
await consumer.HandleBasicDeliver(_consumerTag,
44-
_deliveryTag,
45-
_redelivered,
46-
_exchange,
47-
_routingKey,
48-
_basicProperties,
49-
_body).ConfigureAwait(false);
50-
}
51-
catch (Exception e)
52-
{
53-
if (!(model is ModelBase modelBase))
54-
{
55-
return;
56-
}
40+
return consumer.HandleBasicDeliver(_consumerTag,
41+
_deliveryTag,
42+
_redelivered,
43+
_exchange,
44+
_routingKey,
45+
_basicProperties,
46+
_body);
47+
}
5748

58-
var details = new Dictionary<string, object>()
59-
{
60-
{"consumer", consumer},
61-
{"context", "HandleBasicDeliver"}
62-
};
63-
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
64-
}
65-
finally
49+
public override void PostExecute()
50+
{
51+
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
6652
{
67-
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
68-
{
69-
ArrayPool<byte>.Shared.Return(segment.Array);
70-
}
53+
ArrayPool<byte>.Shared.Return(segment.Array);
7154
}
7255
}
7356
}
Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,21 @@
1-
using System;
2-
using System.Collections.Generic;
31
using System.Threading.Tasks;
42

5-
using RabbitMQ.Client.Events;
6-
73
namespace RabbitMQ.Client.Impl
84
{
9-
sealed class ModelShutdown : Work
5+
internal sealed class ModelShutdown : Work
106
{
11-
readonly ShutdownEventArgs _reason;
7+
private readonly ShutdownEventArgs _reason;
8+
9+
public override string Context => "HandleModelShutdown";
1210

1311
public ModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason) : base(consumer)
1412
{
1513
_reason = reason;
1614
}
1715

18-
protected override async Task Execute(IModel model, IAsyncBasicConsumer consumer)
16+
protected override Task Execute(IModel model, IAsyncBasicConsumer consumer)
1917
{
20-
try
21-
{
22-
await consumer.HandleModelShutdown(model, _reason).ConfigureAwait(false);
23-
}
24-
catch (Exception e)
25-
{
26-
if (!(model is ModelBase modelBase))
27-
{
28-
return;
29-
}
30-
31-
var details = new Dictionary<string, object>()
32-
{
33-
{ "consumer", consumer },
34-
{ "context", "HandleModelShutdown" }
35-
};
36-
modelBase.OnCallbackException(CallbackExceptionEventArgs.Build(e, details));
37-
}
18+
return consumer.HandleModelShutdown(model, _reason);
3819
}
3920
}
4021
}

0 commit comments

Comments
 (0)