Skip to content

Commit 6d7e576

Browse files
committed
Introduce option
1 parent 748ec8c commit 6d7e576

22 files changed

+90
-69
lines changed

projects/RabbitMQ.Client/Constants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static class Constants
8787
/// <summary>
8888
/// The default consumer dispatch concurrency. See <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
8989
/// to set this value for every channel created on a connection,
90-
/// and <see cref="IConnection.CreateChannelAsync(bool, bool, ushort?, System.Threading.CancellationToken)" />
90+
/// and <see cref="IConnection.CreateChannelAsync(CreateChannelOptions?, System.Threading.CancellationToken)" />
9191
/// for setting this value for a particular channel.
9292
/// </summary>
9393
public const ushort DefaultConsumerDispatchConcurrency = 1;
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
namespace RabbitMQ.Client
2+
{
3+
/// <summary>
4+
/// Channel creation options.
5+
/// </summary>
6+
public sealed class CreateChannelOptions
7+
{
8+
/// <summary>
9+
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
10+
/// </summary>
11+
public bool PublisherConfirmationsEnabled { get; set; } = false;
12+
13+
/// <summary>
14+
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
15+
/// </summary>
16+
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
17+
18+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
19+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
20+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
21+
///
22+
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
23+
///
24+
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
25+
/// In addition to that consumers need to be thread/concurrency safe.
26+
public ushort? ConsumerDispatchConcurrency { get; set; } = null;
27+
28+
/// <summary>
29+
/// The default channel options.
30+
/// </summary>
31+
public static CreateChannelOptions Default { get; } = new CreateChannelOptions();
32+
}
33+
}

projects/RabbitMQ.Client/IConnection.cs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -240,26 +240,10 @@ Task CloseAsync(ushort reasonCode, string reasonText, TimeSpan timeout, bool abo
240240
/// <summary>
241241
/// Asynchronously create and return a fresh channel, session, and channel.
242242
/// </summary>
243-
/// <param name="publisherConfirmationsEnabled">
244-
/// Enable or disable publisher confirmations on this channel. Defaults to <c>false</c>
245-
/// </param>
246-
/// <param name="publisherConfirmationTrackingEnabled">
247-
/// Should this library track publisher confirmations for you? Defaults to <c>false</c>
248-
/// </param>
249-
/// <param name="consumerDispatchConcurrency">
250-
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
251-
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
252-
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
253-
///
254-
/// Defaults to <c>null</c>, which will use the value from <see cref="IConnectionFactory.ConsumerDispatchConcurrency"/>
255-
///
256-
/// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
257-
/// In addition to that consumers need to be thread/concurrency safe.
243+
/// <param name="options">
244+
/// The channel creation options.
258245
/// </param>
259246
/// <param name="cancellationToken">Cancellation token</param>
260-
Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
261-
bool publisherConfirmationTrackingEnabled = false,
262-
ushort? consumerDispatchConcurrency = null,
263-
CancellationToken cancellationToken = default);
247+
Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default, CancellationToken cancellationToken = default);
264248
}
265249
}

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,21 @@ await CloseInnerConnectionAsync()
256256
}
257257
}
258258

259-
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
260-
bool publisherConfirmationTrackingEnabled = false,
261-
ushort? consumerDispatchConcurrency = null,
259+
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
262260
CancellationToken cancellationToken = default)
263261
{
264262
EnsureIsOpen();
265263

266-
ushort cdc = consumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
264+
options ??= CreateChannelOptions.Default;
265+
266+
ushort cdc = options.ConsumerDispatchConcurrency.GetValueOrDefault(_config.ConsumerDispatchConcurrency);
267267

268268
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
269-
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cdc, cancellationToken)
269+
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cdc, cancellationToken)
270270
.ConfigureAwait(false);
271271

272272
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
273-
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
273+
options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled);
274274
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
275275
.ConfigureAwait(false);
276276
return autorecoveringChannel;

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -264,17 +264,17 @@ await CloseAsync(ea, true,
264264
}
265265
}
266266

267-
public async Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnabled = false,
268-
bool publisherConfirmationTrackingEnabled = false,
269-
ushort? consumerDispatchConcurrency = null,
267+
public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = default,
270268
CancellationToken cancellationToken = default)
271269
{
272270
EnsureIsOpen();
271+
272+
options ??= CreateChannelOptions.Default;
273273
ISession session = CreateSession();
274274

275275
// TODO channel CreateChannelAsync() to combine ctor and OpenAsync
276-
var channel = new Channel(_config, session, consumerDispatchConcurrency);
277-
IChannel ch = await channel.OpenAsync(publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cancellationToken)
276+
var channel = new Channel(_config, session, options.ConsumerDispatchConcurrency);
277+
IChannel ch = await channel.OpenAsync(options.PublisherConfirmationsEnabled, options.PublisherConfirmationTrackingEnabled, cancellationToken)
278278
.ConfigureAwait(false);
279279
return ch;
280280
}
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1+
RabbitMQ.Client.CreateChannelOptions
2+
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
3+
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
4+
RabbitMQ.Client.CreateChannelOptions.CreateChannelOptions() -> void
5+
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.get -> bool
6+
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationsEnabled.set -> void
7+
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.get -> bool
8+
RabbitMQ.Client.CreateChannelOptions.PublisherConfirmationTrackingEnabled.set -> void
19
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
210
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
311
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
412
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
5-
RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
13+
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
14+
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!

projects/Test/Applications/MassPublish/Program.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,7 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer
137137

138138
publishTasks.Add(Task.Run(async () =>
139139
{
140-
using IChannel publishChannel = await publishConnection.CreateChannelAsync(publisherConfirmationsEnabled: true,
141-
publisherConfirmationTrackingEnabled: true);
140+
using IChannel publishChannel = await publishConnection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
142141
publishChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
143142

144143

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ static async Task PublishMessagesIndividuallyAsync()
5757
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
5858

5959
await using IConnection connection = await CreateConnectionAsync();
60-
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
61-
publisherConfirmationTrackingEnabled: true);
60+
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
6261

6362
// declare a server-named queue
6463
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -83,8 +82,7 @@ static async Task PublishMessagesInBatchAsync()
8382
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");
8483

8584
await using IConnection connection = await CreateConnectionAsync();
86-
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
87-
publisherConfirmationTrackingEnabled: true);
85+
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
8886

8987
// declare a server-named queue
9088
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
@@ -136,8 +134,7 @@ async Task HandlePublishConfirmsAsynchronously()
136134

137135
// NOTE: setting trackConfirmations to false because this program
138136
// is tracking them itself.
139-
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
140-
publisherConfirmationTrackingEnabled: false);
137+
await using IChannel channel = await connection.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = false });
141138

142139
// declare a server-named queue
143140
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();

projects/Test/Common/IntegrationFixture.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,7 @@ public virtual async Task InitializeAsync()
153153

154154
if (_openChannel)
155155
{
156-
_channel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true,
157-
publisherConfirmationTrackingEnabled: true);
156+
_channel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
158157
}
159158

160159
if (IsVerbose)

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName)
199199
{
200200
using (AutorecoveringConnection publishingConn = await CreateAutorecoveringConnectionAsync())
201201
{
202-
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
202+
using (IChannel publishingChannel = await publishingConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
203203
{
204204
for (ushort i = 0; i < TotalMessageCount; i++)
205205
{
@@ -342,7 +342,7 @@ public virtual Task PostHandleDeliveryAsync(ulong deliveryTag,
342342

343343
protected static async Task<bool> SendAndConsumeMessageAsync(IConnection conn, string queue, string exchange, string routingKey)
344344
{
345-
using (IChannel ch = await conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
345+
using (IChannel ch = await conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
346346
{
347347
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
348348

projects/Test/Integration/ConnectionRecovery/TestConnectionRecovery.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
9696
consumer.ReceivedAsync += MessageReceived;
9797
await _channel.BasicConsumeAsync(queueName, true, consumer);
9898

99-
await using (IChannel pubCh = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
99+
await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
100100
{
101101
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
102102
await pubCh.CloseAsync();
@@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
106106

107107
await CloseAndWaitForRecoveryAsync();
108108

109-
await using (IChannel pubCh = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
109+
await using (IChannel pubCh = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
110110
{
111111
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
112112
await pubCh.CloseAsync();

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
213213
});
214214
return Task.CompletedTask;
215215
};
216-
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true))
216+
await using (IChannel publishChannel = await publishConn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true }))
217217
{
218218
AddCallbackExceptionHandlers(publishConn, publishChannel);
219219
publishChannel.DefaultConsumer = new DefaultAsyncConsumer(publishChannel,
@@ -646,7 +646,7 @@ public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
646646
var consumer1 = new AsyncEventingBasicConsumer(_channel);
647647
consumer1.ReceivedAsync += async (sender, args) =>
648648
{
649-
await using IChannel innerChannel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
649+
await using IChannel innerChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
650650
await innerChannel.BasicPublishAsync(exchangeName, queue2Name,
651651
mandatory: true,
652652
body: Encoding.ASCII.GetBytes(nameof(TestCreateChannelWithinAsyncConsumerCallback_GH650)));
@@ -708,7 +708,7 @@ private async Task ValidateConsumerDispatchConcurrency()
708708
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
709709
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
710710
await using IChannel ch = await _conn.CreateChannelAsync(
711-
consumerDispatchConcurrency: expectedConsumerDispatchConcurrency);
711+
new CreateChannelOptions { ConsumerDispatchConcurrency = expectedConsumerDispatchConcurrency});
712712
AutorecoveringChannel ach = (AutorecoveringChannel)ch;
713713
Assert.Equal(expectedConsumerDispatchConcurrency, ach.ConsumerDispatcher.Concurrency);
714714
}

projects/Test/Integration/TestAsyncEventingBasicConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public async Task TestAsyncEventingBasicConsumer_GH1038()
105105
await _channel.BasicConsumeAsync(queueName, false, consumer);
106106

107107
//publisher
108-
await using IChannel publisherChannel = await _conn.CreateChannelAsync(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
108+
await using IChannel publisherChannel = await _conn.CreateChannelAsync(new CreateChannelOptions { PublisherConfirmationsEnabled = true, PublisherConfirmationTrackingEnabled = true });
109109
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
110110
var props = new BasicProperties();
111111
await publisherChannel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty,

0 commit comments

Comments
 (0)