Skip to content

Commit ef8eaf4

Browse files
Merge pull request #946 from BarShavit/Bug/CallbackException
Make sure OnCallbackException is executed for AsyncConsumers
2 parents b3ae582 + f4e9b4f commit ef8eaf4

File tree

2 files changed

+191
-1
lines changed

2 files changed

+191
-1
lines changed

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ private async Task Loop()
9494
{
9595
await task.ConfigureAwait(false);
9696
}
97+
else
98+
{
99+
// to materialize exceptions if any
100+
task.GetAwaiter().GetResult();
101+
}
97102
}
98103
catch (Exception e)
99104
{
@@ -126,7 +131,7 @@ private async Task LoopWithConcurrency(CancellationToken cancellationToken)
126131
while (_channel.Reader.TryRead(out Work work))
127132
{
128133
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
129-
if(!_limiter.Wait(0))
134+
if (!_limiter.Wait(0))
130135
{
131136
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
132137
}
@@ -150,6 +155,11 @@ private static async Task HandleConcurrent(Work work, IModel model, SemaphoreSli
150155
{
151156
await task.ConfigureAwait(false);
152157
}
158+
else
159+
{
160+
// to materialize exceptions if any
161+
task.GetAwaiter().GetResult();
162+
}
153163
}
154164
catch (Exception e)
155165
{
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Threading;
34+
using System.Threading.Tasks;
35+
using NUnit.Framework;
36+
using RabbitMQ.Client.Events;
37+
38+
namespace RabbitMQ.Client.Unit
39+
{
40+
[TestFixture]
41+
public class TestAsyncConsumerExceptions : IntegrationFixture
42+
{
43+
private static Exception TestException = new Exception("oops");
44+
45+
protected void TestExceptionHandlingWith(IBasicConsumer consumer,
46+
Action<IModel, string, IBasicConsumer, string> action)
47+
{
48+
var resetEvent = new AutoResetEvent(false);
49+
bool notified = false;
50+
string q = _model.QueueDeclare();
51+
52+
_model.CallbackException += (m, evt) =>
53+
{
54+
if (evt.Exception != TestException) return;
55+
56+
notified = true;
57+
resetEvent.Set();
58+
};
59+
60+
string tag = _model.BasicConsume(q, true, consumer);
61+
action(_model, q, consumer, tag);
62+
resetEvent.WaitOne();
63+
64+
Assert.IsTrue(notified);
65+
}
66+
67+
[SetUp]
68+
public override void Init()
69+
{
70+
_connFactory = new ConnectionFactory
71+
{
72+
DispatchConsumersAsync = true
73+
};
74+
75+
_conn = _connFactory.CreateConnection();
76+
_model = _conn.CreateModel();
77+
}
78+
79+
[Test]
80+
public void TestCancelNotificationExceptionHandling()
81+
{
82+
IBasicConsumer consumer = new ConsumerFailingOnCancel(_model);
83+
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.QueueDelete(q));
84+
}
85+
86+
[Test]
87+
public void TestConsumerCancelOkExceptionHandling()
88+
{
89+
IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_model);
90+
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicCancel(ct));
91+
}
92+
93+
[Test]
94+
public void TestConsumerConsumeOkExceptionHandling()
95+
{
96+
IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_model);
97+
TestExceptionHandlingWith(consumer, (m, q, c, ct) => { });
98+
}
99+
100+
[Test]
101+
public void TestConsumerShutdownExceptionHandling()
102+
{
103+
IBasicConsumer consumer = new ConsumerFailingOnShutdown(_model);
104+
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.Close());
105+
}
106+
107+
[Test]
108+
public void TestDeliveryExceptionHandling()
109+
{
110+
IBasicConsumer consumer = new ConsumerFailingOnDelivery(_model);
111+
TestExceptionHandlingWith(consumer, (m, q, c, ct) => m.BasicPublish("", q, null, _encoding.GetBytes("msg")));
112+
}
113+
114+
private class ConsumerFailingOnDelivery : AsyncEventingBasicConsumer
115+
{
116+
public ConsumerFailingOnDelivery(IModel model) : base(model)
117+
{
118+
}
119+
120+
public override Task HandleBasicDeliver(string consumerTag,
121+
ulong deliveryTag,
122+
bool redelivered,
123+
string exchange,
124+
string routingKey,
125+
IBasicProperties properties,
126+
ReadOnlyMemory<byte> body)
127+
{
128+
return Task.FromException(TestException);
129+
}
130+
}
131+
132+
private class ConsumerFailingOnCancel : AsyncEventingBasicConsumer
133+
{
134+
public ConsumerFailingOnCancel(IModel model) : base(model)
135+
{
136+
}
137+
138+
public override Task HandleBasicCancel(string consumerTag)
139+
{
140+
return Task.FromException(TestException);
141+
}
142+
}
143+
144+
private class ConsumerFailingOnShutdown : AsyncEventingBasicConsumer
145+
{
146+
public ConsumerFailingOnShutdown(IModel model) : base(model)
147+
{
148+
}
149+
150+
public override Task HandleModelShutdown(object model, ShutdownEventArgs reason)
151+
{
152+
return Task.FromException(TestException);
153+
}
154+
}
155+
156+
private class ConsumerFailingOnConsumeOk : AsyncEventingBasicConsumer
157+
{
158+
public ConsumerFailingOnConsumeOk(IModel model) : base(model)
159+
{
160+
}
161+
162+
public override Task HandleBasicConsumeOk(string consumerTag)
163+
{
164+
return Task.FromException(TestException);
165+
}
166+
}
167+
168+
private class ConsumerFailingOnCancelOk : AsyncEventingBasicConsumer
169+
{
170+
public ConsumerFailingOnCancelOk(IModel model) : base(model)
171+
{
172+
}
173+
174+
public override Task HandleBasicCancelOk(string consumerTag)
175+
{
176+
return Task.FromException(TestException);
177+
}
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)