Skip to content

Commit 7770fd8

Browse files
committed
Follow-up to #1669 - per-channel dispatch concurrency
PR #1669 by @danielmarbach adds the ability to configure consumer dispatch on a per-channel basis. * Test that consumer dispatch concurrency is set on the dispatcher.
1 parent 624cf2e commit 7770fd8

16 files changed

+143
-44
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,6 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
892892
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893893
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
894894
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897-
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
895+
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
898896
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95-
/// <summary>
96-
/// Default value for consumer dispatch concurrency.
97-
/// </summary>
98-
public const ushort DefaultConsumerDispatchConcurrency = 1;
99-
10095
/// <summary>
10196
/// Default value for the desired maximum channel number. Default: 2047.
10297
/// </summary>
@@ -180,7 +175,7 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
180175
/// </summary>
181176
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
182177
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
183-
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
178+
public ushort ConsumerDispatchConcurrency { get; set; } = Constants.DefaultConsumerDispatchConcurrency;
184179

185180
/// <summary>The host to connect to.</summary>
186181
public string HostName { get; set; } = "localhost";

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
242+
/// Defaults to <see cref="Constants.DefaultConsumerDispatchConcurrency"/>.
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
249+
CancellationToken cancellationToken = default);
249250
}
250251
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10-
/// <summary>
11-
/// Asynchronously create and return a fresh channel, session, and channel.
12-
/// </summary>
13-
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14-
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15-
1610
/// <summary>
1711
/// Asynchronously close this connection and all its channels.
1812
/// </summary>

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
41+
public Channel(ConnectionConfig config, ISession session,
42+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
4243
: base(config, session, consumerDispatchConcurrency)
4344
{
4445
}

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,13 @@ public static class Constants
8383
public const int NotImplemented = 540;
8484
///<summary>(= 541)</summary>
8585
public const int InternalError = 541;
86+
87+
/// <summary>
88+
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
89+
/// to set this value for every channel created on a connection,
90+
/// and <see cref="IConnection.CreateChannelAsync(ushort, System.Threading.CancellationToken)"/>
91+
/// for setting this value for a particular channel.
92+
/// </summary>
93+
public const ushort DefaultConsumerDispatchConcurrency = 1;
8694
}
8795
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
using RabbitMQ.Client.Events;
3838
using RabbitMQ.Client.Exceptions;
3939
using RabbitMQ.Client.Impl;
40+
using RabbitMQ.Util;
4041

4142
namespace RabbitMQ.Client.Framing.Impl
4243
{
@@ -240,12 +241,14 @@ await CloseInnerConnectionAsync()
240241
}
241242
}
242243

243-
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
244+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
245+
CancellationToken cancellationToken = default)
244246
{
245247
EnsureIsOpen();
246-
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)
248+
ushort cdc = Misc.DetermineConsumerDispatchConcurrency(_config, consumerDispatchConcurrency);
249+
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(cdc, cancellationToken)
247250
.ConfigureAwait(false);
248-
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, consumerDispatchConcurrency);
251+
AutorecoveringChannel channel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
249252
await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
250253
.ConfigureAwait(false);
251254
return channel;

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
using RabbitMQ.Client.Events;
4646
using RabbitMQ.Client.Exceptions;
4747
using RabbitMQ.Client.Framing.Impl;
48+
using RabbitMQ.Util;
4849

4950
namespace RabbitMQ.Client.Impl
5051
{
@@ -73,10 +74,11 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7374

7475
internal readonly IConsumerDispatcher ConsumerDispatcher;
7576

76-
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
77+
protected ChannelBase(ConnectionConfig config, ISession session,
78+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
7779
{
7880
ContinuationTimeout = config.ContinuationTimeout;
79-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);
81+
ConsumerDispatcher = BuildConsumerDispatcher(config, consumerDispatchConcurrency);
8082
Action<Exception, string> onException = (exception, context) =>
8183
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
8284
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
@@ -92,6 +94,12 @@ protected ChannelBase(ConnectionConfig config, ISession session, ushort consumer
9294
Session = session;
9395
}
9496

97+
private IConsumerDispatcher BuildConsumerDispatcher(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
98+
{
99+
ushort cdc = Misc.DetermineConsumerDispatchConcurrency(config, perChannelConsumerDispatchConcurrency);
100+
return new AsyncConsumerDispatcher(this, cdc);
101+
}
102+
95103
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
96104
public TimeSpan ContinuationTimeout { get; set; }
97105

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
75+
_channel0 = new Channel(_config, _session0);
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,7 +253,8 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
257+
CancellationToken cancellationToken = default)
257258
{
258259
EnsureIsOpen();
259260
ISession session = CreateSession();

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,44 +14,42 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
1414
protected readonly ChannelReader<WorkStruct> _reader;
1515
private readonly ChannelWriter<WorkStruct> _writer;
1616
private readonly Task _worker;
17+
private readonly ushort _concurrency;
1718
private bool _quiesce = false;
1819
private bool _disposed;
1920

2021
internal ConsumerDispatcherChannelBase(ChannelBase channel, ushort concurrency)
2122
{
2223
_channel = channel;
24+
_concurrency = concurrency;
2325
var workChannel = Channel.CreateUnbounded<WorkStruct>(new UnboundedChannelOptions
2426
{
25-
SingleReader = concurrency == 1,
27+
SingleReader = _concurrency == 1,
2628
SingleWriter = false,
2729
AllowSynchronousContinuations = false
2830
});
2931
_reader = workChannel.Reader;
3032
_writer = workChannel.Writer;
3133

3234
Func<Task> loopStart = ProcessChannelAsync;
33-
if (concurrency == 1)
35+
if (_concurrency == 1)
3436
{
3537
_worker = Task.Run(loopStart);
3638
}
3739
else
3840
{
39-
var tasks = new Task[concurrency];
40-
for (int i = 0; i < concurrency; i++)
41+
var tasks = new Task[_concurrency];
42+
for (int i = 0; i < _concurrency; i++)
4143
{
4244
tasks[i] = Task.Run(loopStart);
4345
}
4446
_worker = Task.WhenAll(tasks);
4547
}
4648
}
4749

48-
public bool IsShutdown
49-
{
50-
get
51-
{
52-
return _quiesce;
53-
}
54-
}
50+
public bool IsShutdown => _quiesce;
51+
52+
public ushort Concurrency => _concurrency;
5553

5654
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
5755
{

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ internal interface IConsumerDispatcher : IDisposable
4141

4242
bool IsShutdown { get; }
4343

44+
ushort Concurrency { get; }
45+
4446
IAsyncBasicConsumer GetAndRemoveConsumer(string tag);
4547

4648
ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken);

projects/RabbitMQ.Client/util/Misc.cs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using RabbitMQ.Client;
33+
34+
namespace RabbitMQ.Util
35+
{
36+
internal static class Misc
37+
{
38+
internal static ushort DetermineConsumerDispatchConcurrency(ConnectionConfig config, ushort perChannelConsumerDispatchConcurrency)
39+
{
40+
ushort cdc = config.ConsumerDispatchConcurrency;
41+
if (perChannelConsumerDispatchConcurrency > Constants.DefaultConsumerDispatchConcurrency)
42+
{
43+
cdc = perChannelConsumerDispatchConcurrency;
44+
}
45+
return cdc;
46+
}
47+
}
48+
}

projects/Test/Common/IntegrationFixture.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class IntegrationFixture : IAsyncLifetime
7171
protected readonly ITestOutputHelper _output;
7272
protected readonly string _testDisplayName;
7373

74-
protected readonly ushort _consumerDispatchConcurrency = 1;
74+
protected readonly ushort _consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency;
7575
protected readonly bool _openChannel = true;
7676

7777
public static readonly TimeSpan ShortSpan;
@@ -109,7 +109,7 @@ static IntegrationFixture()
109109
}
110110

111111
public IntegrationFixture(ITestOutputHelper output,
112-
ushort consumerDispatchConcurrency = 1,
112+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
113113
bool openChannel = true)
114114
{
115115
_consumerDispatchConcurrency = consumerDispatchConcurrency;
@@ -143,8 +143,7 @@ public virtual async Task InitializeAsync()
143143
*/
144144
if (_connFactory == null)
145145
{
146-
_connFactory = CreateConnectionFactory();
147-
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
146+
_connFactory = CreateConnectionFactory(_consumerDispatchConcurrency);
148147
}
149148

150149
if (_conn == null)
@@ -517,13 +516,15 @@ protected static async Task WaitAsync(TaskCompletionSource<bool> tcs, TimeSpan t
517516
}
518517
}
519518

520-
protected ConnectionFactory CreateConnectionFactory()
519+
protected ConnectionFactory CreateConnectionFactory(
520+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
521521
{
522522
return new ConnectionFactory
523523
{
524524
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
525525
ContinuationTimeout = WaitSpan,
526526
HandshakeContinuationTimeout = WaitSpan,
527+
ConsumerDispatchConcurrency = consumerDispatchConcurrency
527528
};
528529
}
529530

0 commit comments

Comments
 (0)