Skip to content

Commit df62f86

Browse files
committed
Allow the dispatcher concurrency to be overridden per channel
1 parent b2282c7 commit df62f86

File tree

9 files changed

+26
-17
lines changed

9 files changed

+26
-17
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
826826
~RabbitMQ.Client.IChannel.WaitForConfirmsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<bool>
827827
~RabbitMQ.Client.IChannel.WaitForConfirmsOrDieAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
828828
~RabbitMQ.Client.IConnection.CloseAsync(ushort reasonCode, string reasonText, System.TimeSpan timeout, bool abort, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
829-
~RabbitMQ.Client.IConnection.CreateChannelAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
829+
~RabbitMQ.Client.IConnection.CreateChannelAsync(int? dispatchConsumerConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel>
830830
~RabbitMQ.Client.IConnection.UpdateSecretAsync(string newSecret, string reason, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task
831831
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
832832
~RabbitMQ.Client.IConnectionFactory.CreateConnectionAsync(System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint> endpoints, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,17 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
234234
/// <summary>
235235
/// Asynchronously create and return a fresh channel, session, and channel.
236236
/// </summary>
237+
/// <param name="dispatchConsumerConcurrency">
238+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
239+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241+
///
242+
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
243+
///
244+
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245+
/// In addition to that consumers need to be thread/concurrency safe.
246+
/// </param>
237247
/// <param name="cancellationToken">Cancellation token</param>
238-
Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default);
239-
248+
Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default);
240249
}
241250
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session) : base(config, session)
41+
public Channel(ConnectionConfig config, ISession session, int? dispatchConsumerConcurrency = null) : base(config, session, dispatchConsumerConcurrency)
4242
{
4343
}
4444

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
160160

161161
_connection = conn;
162162

163-
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken)
163+
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(cancellationToken: cancellationToken)
164164
.ConfigureAwait(false);
165165
newChannel.TakeOver(_innerChannel);
166166

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
296296
{
297297
try
298298
{
299-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
299+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
300300
{
301301
await recordedExchange.RecoverAsync(ch, cancellationToken)
302302
.ConfigureAwait(false);
@@ -348,7 +348,7 @@ private async Task RecoverQueuesAsync(IConnection connection,
348348
try
349349
{
350350
string newName = string.Empty;
351-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
351+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
352352
{
353353
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
354354
.ConfigureAwait(false);
@@ -459,7 +459,7 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
459459
{
460460
try
461461
{
462-
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken).ConfigureAwait(false))
462+
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
463463
{
464464
await binding.RecoverAsync(ch, cancellationToken)
465465
.ConfigureAwait(false);

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,10 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
173173

174174
public IProtocol Protocol => Endpoint.Protocol;
175175

176-
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(CancellationToken cancellationToken)
176+
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default)
177177
{
178178
ISession session = InnerConnection.CreateSession();
179-
var result = new RecoveryAwareChannel(_config, session);
179+
var result = new RecoveryAwareChannel(_config, session, dispatchConsumerConcurrency);
180180
return (RecoveryAwareChannel)await result.OpenAsync(cancellationToken).ConfigureAwait(false);
181181
}
182182

@@ -239,10 +239,10 @@ await CloseInnerConnectionAsync()
239239
}
240240
}
241241

242-
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
242+
public async Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null,CancellationToken cancellationToken = default)
243243
{
244244
EnsureIsOpen();
245-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cancellationToken)
245+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(dispatchConsumerConcurrency, cancellationToken)
246246
.ConfigureAwait(false);
247247
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel);
248248
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session)
76+
protected ChannelBase(ConnectionConfig config, ISession session, int? dispatchConsumerConcurrency = null)
7777
{
7878
ContinuationTimeout = config.ContinuationTimeout;
79-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, config.DispatchConsumerConcurrency);
79+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, dispatchConsumerConcurrency.GetValueOrDefault(config.DispatchConsumerConcurrency));
8080
Action<Exception, string> onException = (exception, context) =>
8181
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8282
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,11 +253,11 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(int? dispatchConsumerConcurrency = null, CancellationToken cancellationToken = default)
257257
{
258258
EnsureIsOpen();
259259
ISession session = CreateSession();
260-
var channel = new Channel(_config, session);
260+
var channel = new Channel(_config, session, dispatchConsumerConcurrency);
261261
return channel.OpenAsync(cancellationToken);
262262
}
263263

projects/RabbitMQ.Client/client/impl/RecoveryAwareChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ namespace RabbitMQ.Client.Impl
3737
{
3838
internal sealed class RecoveryAwareChannel : Channel
3939
{
40-
public RecoveryAwareChannel(ConnectionConfig config, ISession session) : base(config, session)
40+
public RecoveryAwareChannel(ConnectionConfig config, ISession session, int? dispatchConsumerConcurrency = null) : base(config, session, dispatchConsumerConcurrency)
4141
{
4242
ActiveDeliveryTagOffset = 0;
4343
MaxSeenDeliveryTag = 0;

0 commit comments

Comments
 (0)