Skip to content

Commit 7b4a308

Browse files
Merge pull request #899 from danielmarbach/processing-concurrency
Pull ProcessingConcurrency into connection factory interface
2 parents 849b9b8 + 970fdbe commit 7b4a308

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

projects/RabbitMQ.Client/client/api/IConnectionFactory.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,5 +180,15 @@ public interface IConnectionFactory
180180
/// timing out.
181181
/// </summary>
182182
TimeSpan ContinuationTimeout { get; set; }
183+
184+
/// <summary>
185+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
186+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
187+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
188+
/// Defaults to 1.
189+
/// </summary>
190+
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
191+
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
192+
int ProcessingConcurrency { get; set; }
183193
}
184194
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,14 +110,13 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
110110
_factory = factory;
111111
_frameHandler = frameHandler;
112112

113-
int processingConcurrency = (factory as ConnectionFactory)?.ProcessingConcurrency ?? 1;
114113
if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
115114
{
116-
ConsumerWorkService = new AsyncConsumerWorkService(processingConcurrency);
115+
ConsumerWorkService = new AsyncConsumerWorkService(factory.ProcessingConcurrency);
117116
}
118117
else
119118
{
120-
ConsumerWorkService = new ConsumerWorkService(processingConcurrency);
119+
ConsumerWorkService = new ConsumerWorkService(factory.ProcessingConcurrency);
121120
}
122121

123122
_sessionManager = new SessionManager(this, 0);

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ namespace RabbitMQ.Client
337337
System.TimeSpan ContinuationTimeout { get; set; }
338338
System.TimeSpan HandshakeContinuationTimeout { get; set; }
339339
string Password { get; set; }
340+
int ProcessingConcurrency { get; set; }
340341
ushort RequestedChannelMax { get; set; }
341342
uint RequestedFrameMax { get; set; }
342343
System.TimeSpan RequestedHeartbeat { get; set; }

0 commit comments

Comments
 (0)