Skip to content

Pull ProcessingConcurrency into connection factory interface #899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions projects/RabbitMQ.Client/client/api/IConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,15 @@ public interface IConnectionFactory
/// timing out.
/// </summary>
TimeSpan ContinuationTimeout { get; set; }

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
/// Defaults to 1.
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
int ProcessingConcurrency { get; set; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this ConsumerDispatchConcurrency or similar? "Processing" is not particularly specific.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name already passed all the reviews, is on master and also brought into 6.x. What is causing the sudden name change request? Especially because I asked for this feedback as part of #866 (comment)

Btw. I'm not opposed to what you are proposing. Just wondering why

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment uses "consumer concurrency" which is roughly what I'm proposing. Things can easily be changed in master.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't participated in #866 and not every member of our team or community cares equally about naming. So as soon as I've noticed it I tried to suggest something clearer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I guess 6.2 isn't out yet so we could still fix it right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we merge this PR and then do the rename PR later so that we can also backport the rename?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can still change the name for 6.2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming can be done in another PR, no problem at all.

}
}
5 changes: 2 additions & 3 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,13 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
_factory = factory;
_frameHandler = frameHandler;

int processingConcurrency = (factory as ConnectionFactory)?.ProcessingConcurrency ?? 1;
if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
{
ConsumerWorkService = new AsyncConsumerWorkService(processingConcurrency);
ConsumerWorkService = new AsyncConsumerWorkService(factory.ProcessingConcurrency);
}
else
{
ConsumerWorkService = new ConsumerWorkService(processingConcurrency);
ConsumerWorkService = new ConsumerWorkService(factory.ProcessingConcurrency);
}

_sessionManager = new SessionManager(this, 0);
Expand Down
1 change: 1 addition & 0 deletions projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ namespace RabbitMQ.Client
System.TimeSpan ContinuationTimeout { get; set; }
System.TimeSpan HandshakeContinuationTimeout { get; set; }
string Password { get; set; }
int ProcessingConcurrency { get; set; }
ushort RequestedChannelMax { get; set; }
uint RequestedFrameMax { get; set; }
System.TimeSpan RequestedHeartbeat { get; set; }
Expand Down