Skip to content

Commit 57835a6

Browse files
committed
Follow-up to #1669 - per-channel dispatch concurrency
As @danielmarbach points out, it's kind of silly to have the dispatch concurrency set in the connection factory as PR #1669 makes this value per-channel. So, we'll remove it from `IConnectionFactory` and use a default parameter value of `1` in `CreateChannelAsync`
1 parent 624cf2e commit 57835a6

16 files changed

+31
-68
lines changed

projects/Benchmarks/Networking/Networking_BasicDeliver_Commons.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ namespace Benchmarks.Networking
99
[MemoryDiagnoser]
1010
public class Networking_BasicDeliver_Commons
1111
{
12-
public static async Task Publish_Hello_World(IConnection connection, uint messageCount, byte[] body)
12+
public static async Task Publish_Hello_World(IConnection connection,
13+
uint messageCount, byte[] body, ushort consumerDispatchConcurrency = 1)
1314
{
14-
using (IChannel channel = await connection.CreateChannelAsync())
15+
using (IChannel channel = await connection.CreateChannelAsync(consumerDispatchConcurrency))
1516
{
1617
QueueDeclareOk queue = await channel.QueueDeclareAsync();
1718
var consumer = new CountingConsumer(channel, messageCount);

projects/Benchmarks/Networking/Networking_BasicDeliver_ConnectionChurn.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ public void GlobalCleanup()
2929
[Benchmark(Baseline = true)]
3030
public async Task Publish_Hello_World()
3131
{
32-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
32+
var cf = new ConnectionFactory();
3333
using (IConnection connection = await cf.CreateConnectionAsync())
3434
{
35-
await Publish_Hello_World(connection);
35+
await Publish_Hello_World(connection, 2);
3636
}
3737
}
3838

39-
public static async Task Publish_Hello_World(IConnection connection)
39+
public static async Task Publish_Hello_World(IConnection connection, ushort consumerDispatchConcurrency)
4040
{
41-
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body);
41+
await Networking_BasicDeliver_Commons.Publish_Hello_World(connection, messageCount, _body,
42+
consumerDispatchConcurrency);
4243
}
4344
}
4445
}

projects/Benchmarks/Networking/Networking_BasicDeliver_LongLivedConnection.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public class Networking_BasicDeliver_LongLivedConnection
1919
public void GlobalSetup()
2020
{
2121
_container = RabbitMQBroker.Start();
22-
23-
var cf = new ConnectionFactory { ConsumerDispatchConcurrency = 2 };
22+
var cf = new ConnectionFactory();
2423
// NOTE: https://github.com/dotnet/BenchmarkDotNet/issues/1738
2524
_connection = EnsureCompleted(cf.CreateConnectionAsync());
2625
}
@@ -35,7 +34,8 @@ public void GlobalCleanup()
3534
[Benchmark(Baseline = true)]
3635
public Task Publish_Hello_World()
3736
{
38-
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body);
37+
return Networking_BasicDeliver_Commons.Publish_Hello_World(_connection, messageCount, _body,
38+
consumerDispatchConcurrency: 2);
3939
}
4040

4141
private static T EnsureCompleted<T>(Task<T> task) => task.GetAwaiter().GetResult();

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,6 @@ RabbitMQ.Client.ConnectionFactory.ClientProperties.set -> void
189189
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.get -> string
190190
RabbitMQ.Client.ConnectionFactory.ClientProvidedName.set -> void
191191
RabbitMQ.Client.ConnectionFactory.ConnectionFactory() -> void
192-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
193-
RabbitMQ.Client.ConnectionFactory.ConsumerDispatchConcurrency.set -> void
194192
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
195193
RabbitMQ.Client.ConnectionFactory.ContinuationTimeout.set -> void
196194
RabbitMQ.Client.ConnectionFactory.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
@@ -472,8 +470,6 @@ RabbitMQ.Client.IConnectionFactory.ClientProperties.get -> System.Collections.Ge
472470
RabbitMQ.Client.IConnectionFactory.ClientProperties.set -> void
473471
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.get -> string
474472
RabbitMQ.Client.IConnectionFactory.ClientProvidedName.set -> void
475-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.get -> ushort
476-
RabbitMQ.Client.IConnectionFactory.ConsumerDispatchConcurrency.set -> void
477473
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.get -> System.TimeSpan
478474
RabbitMQ.Client.IConnectionFactory.ContinuationTimeout.set -> void
479475
RabbitMQ.Client.IConnectionFactory.HandshakeContinuationTimeout.get -> System.TimeSpan
@@ -892,7 +888,4 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client
892888
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
893889
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, string! exchange, string! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
894890
RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
895-
const RabbitMQ.Client.ConnectionFactory.DefaultConsumerDispatchConcurrency = 1 -> ushort
896-
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
897-
static RabbitMQ.Client.IConnectionExtensions.CreateChannelAsync(this RabbitMQ.Client.IConnection! connection, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
898-
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
891+
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort consumerDispatchConcurrency = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,6 @@ public sealed class ConnectionConfig
134134
/// </summary>
135135
public readonly TimeSpan RequestedConnectionTimeout;
136136

137-
/// <summary>
138-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
139-
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
140-
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
141-
/// </summary>
142-
public readonly ushort ConsumerDispatchConcurrency;
143-
144137
internal readonly Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> FrameHandlerFactoryAsync;
145138

146139
internal ConnectionConfig(string virtualHost, string userName, string password,
@@ -150,7 +143,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
150143
ushort maxChannelCount, uint maxFrameSize, uint maxInboundMessageBodySize, bool topologyRecoveryEnabled,
151144
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
152145
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
153-
ushort consumerDispatchConcurrency, Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
146+
Func<AmqpTcpEndpoint, CancellationToken, Task<IFrameHandler>> frameHandlerFactoryAsync)
154147
{
155148
VirtualHost = virtualHost;
156149
UserName = userName;
@@ -170,7 +163,6 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
170163
ContinuationTimeout = continuationTimeout;
171164
HandshakeContinuationTimeout = handshakeContinuationTimeout;
172165
RequestedConnectionTimeout = requestedConnectionTimeout;
173-
ConsumerDispatchConcurrency = consumerDispatchConcurrency;
174166
FrameHandlerFactoryAsync = frameHandlerFactoryAsync;
175167
}
176168
}

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ namespace RabbitMQ.Client
9292
///hosts with an empty name are not addressable. </para></remarks>
9393
public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
9494
{
95-
/// <summary>
96-
/// Default value for consumer dispatch concurrency.
97-
/// </summary>
98-
public const ushort DefaultConsumerDispatchConcurrency = 1;
9995

10096
/// <summary>
10197
/// Default value for the desired maximum channel number. Default: 2047.
@@ -172,16 +168,6 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IConnectionFactor
172168
/// </summary>
173169
public bool AutomaticRecoveryEnabled { get; set; } = true;
174170

175-
/// <summary>
176-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
177-
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
178-
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
179-
/// Defaults to 1.
180-
/// </summary>
181-
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
182-
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
183-
public ushort ConsumerDispatchConcurrency { get; set; } = DefaultConsumerDispatchConcurrency;
184-
185171
/// <summary>The host to connect to.</summary>
186172
public string HostName { get; set; } = "localhost";
187173

@@ -603,7 +589,6 @@ private ConnectionConfig CreateConfig(string? clientProvidedName)
603589
ContinuationTimeout,
604590
HandshakeContinuationTimeout,
605591
RequestedConnectionTimeout,
606-
ConsumerDispatchConcurrency,
607592
CreateFrameHandlerAsync);
608593
}
609594

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,13 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
239239
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
240240
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
241241
///
242-
/// Defaults to <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>.
242+
/// The default value is 1.
243243
///
244244
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
245245
/// In addition to that consumers need to be thread/concurrency safe.
246246
/// </param>
247247
/// <param name="cancellationToken">Cancellation token</param>
248-
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default);
248+
Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
249+
CancellationToken cancellationToken = default);
249250
}
250251
}

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,6 @@ namespace RabbitMQ.Client
77
{
88
public static class IConnectionExtensions
99
{
10-
/// <summary>
11-
/// Asynchronously create and return a fresh channel, session, and channel.
12-
/// </summary>
13-
public static Task<IChannel> CreateChannelAsync(this IConnection connection, CancellationToken cancellationToken = default) =>
14-
connection.CreateChannelAsync(ConnectionFactory.DefaultConsumerDispatchConcurrency, cancellationToken);
15-
1610
/// <summary>
1711
/// Asynchronously close this connection and all its channels.
1812
/// </summary>

projects/RabbitMQ.Client/client/api/IConnectionFactory.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,5 @@ Task<IConnection> CreateConnectionAsync(IEnumerable<AmqpTcpEndpoint> endpoints,
185185
/// timing out.
186186
/// </summary>
187187
TimeSpan ContinuationTimeout { get; set; }
188-
189-
/// <summary>
190-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
191-
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
192-
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
193-
/// Defaults to 1.
194-
/// </summary>
195-
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
196-
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
197-
ushort ConsumerDispatchConcurrency { get; set; }
198188
}
199189
}

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ namespace RabbitMQ.Client.Framing.Impl
3838
{
3939
internal class Channel : ChannelBase
4040
{
41-
public Channel(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
41+
public Channel(ConnectionConfig config, ISession session,
42+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
4243
: base(config, session, consumerDispatchConcurrency)
4344
{
4445
}

projects/RabbitMQ.Client/client/framing/Constants.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,7 @@ public static class Constants
8383
public const int NotImplemented = 540;
8484
///<summary>(= 541)</summary>
8585
public const int InternalError = 541;
86+
87+
internal const ushort DefaultConsumerDispatchConcurrency = 1;
8688
}
8789
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ await CloseInnerConnectionAsync()
240240
}
241241
}
242242

243-
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
243+
public async Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
244+
CancellationToken cancellationToken = default)
244245
{
245246
EnsureIsOpen();
246247
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(consumerDispatchConcurrency, cancellationToken)

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ internal abstract class ChannelBase : IChannel, IRecoverable
7373

7474
internal readonly IConsumerDispatcher ConsumerDispatcher;
7575

76-
protected ChannelBase(ConnectionConfig config, ISession session, ushort consumerDispatchConcurrency)
76+
protected ChannelBase(ConnectionConfig config, ISession session,
77+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency)
7778
{
7879
ContinuationTimeout = config.ContinuationTimeout;
7980
ConsumerDispatcher = new AsyncConsumerDispatcher(this, consumerDispatchConcurrency);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
7272

7373
_sessionManager = new SessionManager(this, 0, config.MaxInboundMessageBodySize);
7474
_session0 = new MainSession(this, config.MaxInboundMessageBodySize);
75-
_channel0 = new Channel(_config, _session0, ConnectionFactory.DefaultConsumerDispatchConcurrency); ;
75+
_channel0 = new Channel(_config, _session0);
7676

7777
ClientProperties = new Dictionary<string, object?>(_config.ClientProperties)
7878
{
@@ -253,7 +253,8 @@ await CloseAsync(ea, true,
253253
}
254254
}
255255

256-
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency, CancellationToken cancellationToken = default)
256+
public Task<IChannel> CreateChannelAsync(ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
257+
CancellationToken cancellationToken = default)
257258
{
258259
EnsureIsOpen();
259260
ISession session = CreateSession();

projects/Test/Common/IntegrationFixture.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ static IntegrationFixture()
109109
}
110110

111111
public IntegrationFixture(ITestOutputHelper output,
112-
ushort consumerDispatchConcurrency = 1,
112+
ushort consumerDispatchConcurrency = Constants.DefaultConsumerDispatchConcurrency,
113113
bool openChannel = true)
114114
{
115115
_consumerDispatchConcurrency = consumerDispatchConcurrency;
@@ -144,7 +144,6 @@ public virtual async Task InitializeAsync()
144144
if (_connFactory == null)
145145
{
146146
_connFactory = CreateConnectionFactory();
147-
_connFactory.ConsumerDispatchConcurrency = _consumerDispatchConcurrency;
148147
}
149148

150149
if (_conn == null)
@@ -153,7 +152,8 @@ public virtual async Task InitializeAsync()
153152

154153
if (_openChannel)
155154
{
156-
_channel = await _conn.CreateChannelAsync();
155+
_channel = await _conn.CreateChannelAsync(
156+
consumerDispatchConcurrency: _consumerDispatchConcurrency);
157157
}
158158

159159
if (IsVerbose)

projects/Test/Integration/TestConcurrentAccessBase.cs.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class TestConcurrentAccessBase : IntegrationFixture
4242
protected const ushort _messageCount = 200;
4343

4444
public TestConcurrentAccessBase(ITestOutputHelper output,
45-
ushort consumerDispatchConcurrency = 1,
45+
ushort consumerDispatchConcurrency = RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency,
4646
bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel)
4747
{
4848
}

0 commit comments

Comments
 (0)