Skip to content

Commit c393768

Browse files
committed
Introduced an async consumer dispatcher
1 parent e414b85 commit c393768

11 files changed

+729
-2
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using RabbitMQ.Client.Events;
4+
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
5+
6+
namespace RabbitMQ.Client
7+
{
8+
public class AsyncDefaultBasicConsumer : IAsyncBasicConsumer
9+
{
10+
public readonly object m_eventLock = new object();
11+
public AsyncEventHandler<ConsumerEventArgs> m_consumerCancelled;
12+
13+
/// <summary>
14+
/// Creates a new instance of an <see cref="DefaultBasicConsumer"/>.
15+
/// </summary>
16+
public AsyncDefaultBasicConsumer()
17+
{
18+
ShutdownReason = null;
19+
Model = null;
20+
IsRunning = false;
21+
ConsumerTag = null;
22+
}
23+
24+
/// <summary>
25+
/// Constructor which sets the Model property to the given value.
26+
/// </summary>
27+
/// <param name="model">Common AMQP model.</param>
28+
public AsyncDefaultBasicConsumer(IModel model)
29+
{
30+
ShutdownReason = null;
31+
IsRunning = false;
32+
ConsumerTag = null;
33+
Model = model;
34+
}
35+
36+
/// <summary>
37+
/// Retrieve the consumer tag this consumer is registered as; to be used when discussing this consumer
38+
/// with the server, for instance with <see cref="IModel.BasicCancel"/>.
39+
/// </summary>
40+
public string ConsumerTag { get; set; }
41+
42+
/// <summary>
43+
/// Returns true while the consumer is registered and expecting deliveries from the broker.
44+
/// </summary>
45+
public bool IsRunning { get; protected set; }
46+
47+
/// <summary>
48+
/// If our <see cref="IModel"/> shuts down, this property will contain a description of the reason for the
49+
/// shutdown. Otherwise it will contain null. See <see cref="ShutdownEventArgs"/>.
50+
/// </summary>
51+
public ShutdownEventArgs ShutdownReason { get; protected set; }
52+
53+
/// <summary>
54+
/// Signalled when the consumer gets cancelled.
55+
/// </summary>
56+
public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
57+
{
58+
add
59+
{
60+
lock (m_eventLock)
61+
{
62+
m_consumerCancelled += value;
63+
}
64+
}
65+
remove
66+
{
67+
lock (m_eventLock)
68+
{
69+
m_consumerCancelled -= value;
70+
}
71+
}
72+
}
73+
74+
/// <summary>
75+
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
76+
/// for use in acknowledging received messages, for instance.
77+
/// </summary>
78+
public IModel Model { get; set; }
79+
80+
/// <summary>
81+
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
82+
/// e.g. the queue has been deleted (either by this channel or by any other channel).
83+
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
84+
/// </summary>
85+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
86+
public virtual async Task HandleBasicCancel(string consumerTag)
87+
{
88+
await OnCancel().ConfigureAwait(false);
89+
}
90+
91+
/// <summary>
92+
/// Called upon successful deregistration of the consumer from the broker.
93+
/// </summary>
94+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
95+
public virtual async Task HandleBasicCancelOk(string consumerTag)
96+
{
97+
await OnCancel().ConfigureAwait(false);
98+
}
99+
100+
/// <summary>
101+
/// Called upon successful registration of the consumer with the broker.
102+
/// </summary>
103+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
104+
public virtual Task HandleBasicConsumeOk(string consumerTag)
105+
{
106+
ConsumerTag = consumerTag;
107+
IsRunning = true;
108+
return TaskExtensions.CompletedTask;
109+
}
110+
111+
/// <summary>
112+
/// Called each time a message arrives for this consumer.
113+
/// </summary>
114+
/// <remarks>
115+
/// Does nothing with the passed in information.
116+
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
117+
/// The implementation of this method in this class does NOT acknowledge such messages.
118+
/// </remarks>
119+
public virtual Task HandleBasicDeliver(string consumerTag,
120+
ulong deliveryTag,
121+
bool redelivered,
122+
string exchange,
123+
string routingKey,
124+
IBasicProperties properties,
125+
byte[] body)
126+
{
127+
// Nothing to do here.
128+
return TaskExtensions.CompletedTask;
129+
}
130+
131+
/// <summary>
132+
/// Called when the model shuts down.
133+
/// </summary>
134+
/// <param name="model"> Common AMQP model.</param>
135+
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
136+
public virtual async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
137+
{
138+
ShutdownReason = reason;
139+
await OnCancel().ConfigureAwait(false);
140+
}
141+
142+
/// <summary>
143+
/// Default implementation - overridable in subclasses.</summary>
144+
/// <remarks>
145+
/// This default implementation simply sets the <see cref="IsRunning"/>
146+
/// property to false, and takes no further action.
147+
/// </remarks>
148+
public virtual async Task OnCancel()
149+
{
150+
IsRunning = false;
151+
AsyncEventHandler<ConsumerEventArgs> handler;
152+
lock (m_eventLock)
153+
{
154+
handler = m_consumerCancelled;
155+
}
156+
if (handler != null)
157+
{
158+
foreach (AsyncEventHandler<ConsumerEventArgs> h in handler.GetInvocationList())
159+
{
160+
await h(this, new ConsumerEventArgs(ConsumerTag)).ConfigureAwait(false);
161+
}
162+
}
163+
}
164+
}
165+
}

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
158158
/// </summary>
159159
public bool AutomaticRecoveryEnabled { get; set; } = true;
160160

161+
/// <summary>
162+
/// Set to true will enable a asynchronous consumer dispatcher which is compatible with <see cref="IAsyncBasicConsumer"/>.
163+
/// Defaults to false.
164+
/// </summary>
165+
public bool DispatchConsumersAsync { get; set; } = false;
166+
161167
/// <summary>The host to connect to.</summary>
162168
public string HostName { get; set; } = "localhost";
163169

@@ -475,7 +481,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, String c
475481
throw new BrokerUnreachableException(e);
476482
}
477483

478-
return conn;
484+
return DispatchConsumersAsync ? new AsyncConnectionDecorator(conn) : conn;
479485
}
480486

481487
public IFrameHandler CreateFrameHandler()
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System.Threading.Tasks;
2+
using RabbitMQ.Client.Events;
3+
4+
namespace RabbitMQ.Client
5+
{
6+
public interface IAsyncBasicConsumer
7+
{
8+
/// <summary>
9+
/// Retrieve the <see cref="IModel"/> this consumer is associated with,
10+
/// for use in acknowledging received messages, for instance.
11+
/// </summary>
12+
IModel Model { get; }
13+
14+
/// <summary>
15+
/// Signalled when the consumer gets cancelled.
16+
/// </summary>
17+
event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled;
18+
19+
/// <summary>
20+
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
21+
/// e.g. the queue has been deleted (either by this channel or by any other channel).
22+
/// See <see cref="HandleBasicCancelOk"/> for notification of consumer cancellation due to basicCancel
23+
/// </summary>
24+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
25+
Task HandleBasicCancel(string consumerTag);
26+
27+
/// <summary>
28+
/// Called upon successful deregistration of the consumer from the broker.
29+
/// </summary>
30+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
31+
Task HandleBasicCancelOk(string consumerTag);
32+
33+
/// <summary>
34+
/// Called upon successful registration of the consumer with the broker.
35+
/// </summary>
36+
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
37+
Task HandleBasicConsumeOk(string consumerTag);
38+
39+
/// <summary>
40+
/// Called each time a message arrives for this consumer.
41+
/// </summary>
42+
/// <remarks>
43+
/// Does nothing with the passed in information.
44+
/// Note that in particular, some delivered messages may require acknowledgement via <see cref="IModel.BasicAck"/>.
45+
/// The implementation of this method in this class does NOT acknowledge such messages.
46+
/// </remarks>
47+
Task HandleBasicDeliver(string consumerTag,
48+
ulong deliveryTag,
49+
bool redelivered,
50+
string exchange,
51+
string routingKey,
52+
IBasicProperties properties,
53+
byte[] body);
54+
55+
/// <summary>
56+
/// Called when the model shuts down.
57+
/// </summary>
58+
/// <param name="model"> Common AMQP model.</param>
59+
/// <param name="reason"> Information about the reason why a particular model, session, or connection was destroyed.</param>
60+
Task HandleModelShutdown(object model, ShutdownEventArgs reason);
61+
}
62+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace RabbitMQ.Client
2+
{
3+
interface IAsyncConnection : IConnection
4+
{
5+
AsyncConsumerWorkService AsyncConsumerWorkService { get; }
6+
}
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
namespace RabbitMQ.Client.Events
5+
{
6+
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : EventArgs;
7+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using TaskExtensions = RabbitMQ.Client.Impl.TaskExtensions;
4+
5+
namespace RabbitMQ.Client.Events
6+
{
7+
public class AsyncEventingBasicConsumer : AsyncDefaultBasicConsumer
8+
{
9+
///<summary>Constructor which sets the Model property to the
10+
///given value.</summary>
11+
public AsyncEventingBasicConsumer(IModel model) : base(model)
12+
{
13+
}
14+
15+
///<summary>Event fired on HandleBasicDeliver.</summary>
16+
public event AsyncEventHandler<BasicDeliverEventArgs> Received;
17+
18+
///<summary>Event fired on HandleBasicConsumeOk.</summary>
19+
public event AsyncEventHandler<ConsumerEventArgs> Registered;
20+
21+
///<summary>Event fired on HandleModelShutdown.</summary>
22+
public event AsyncEventHandler<ShutdownEventArgs> Shutdown;
23+
24+
///<summary>Event fired on HandleBasicCancelOk.</summary>
25+
public event AsyncEventHandler<ConsumerEventArgs> Unregistered;
26+
27+
///<summary>Fires the Unregistered event.</summary>
28+
public override async Task HandleBasicCancelOk(string consumerTag)
29+
{
30+
await base.HandleBasicCancelOk(consumerTag).ConfigureAwait(false);
31+
await Raise(Unregistered, new ConsumerEventArgs(consumerTag)).ConfigureAwait(false);
32+
}
33+
34+
///<summary>Fires the Registered event.</summary>
35+
public override async Task HandleBasicConsumeOk(string consumerTag)
36+
{
37+
await base.HandleBasicConsumeOk(consumerTag).ConfigureAwait(false);
38+
await Raise(Registered, new ConsumerEventArgs(consumerTag)).ConfigureAwait(false);
39+
}
40+
41+
///<summary>Fires the Received event.</summary>
42+
public override async Task HandleBasicDeliver(string consumerTag,
43+
ulong deliveryTag,
44+
bool redelivered,
45+
string exchange,
46+
string routingKey,
47+
IBasicProperties properties,
48+
byte[] body)
49+
{
50+
await base.HandleBasicDeliver(consumerTag,
51+
deliveryTag,
52+
redelivered,
53+
exchange,
54+
routingKey,
55+
properties,
56+
body).ConfigureAwait(false);
57+
await Raise(Received, new BasicDeliverEventArgs(consumerTag,
58+
deliveryTag,
59+
redelivered,
60+
exchange,
61+
routingKey,
62+
properties,
63+
body)).ConfigureAwait(false);
64+
}
65+
66+
///<summary>Fires the Shutdown event.</summary>
67+
public override async Task HandleModelShutdown(object model, ShutdownEventArgs reason)
68+
{
69+
await base.HandleModelShutdown(model, reason).ConfigureAwait(false);
70+
await Raise(Shutdown, reason).ConfigureAwait(false);
71+
}
72+
73+
private Task Raise<TEvent>(AsyncEventHandler<TEvent> eventHandler, TEvent evt)
74+
where TEvent : EventArgs
75+
{
76+
var handler = eventHandler;
77+
if (handler != null)
78+
{
79+
return handler(this, evt);
80+
}
81+
return TaskExtensions.CompletedTask;
82+
}
83+
}
84+
}

0 commit comments

Comments
 (0)