-
Notifications
You must be signed in to change notification settings - Fork 42
Multiple Producers and Consumers per connection #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
multi consumers per connection Signed-off-by: Gabriele Santomaggio <[email protected]>
multi consumers per connection Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
handle consumers list Signed-off-by: Gabriele Santomaggio <[email protected]>
consumer and producer Signed-off-by: Gabriele Santomaggio <[email protected]>
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #328 +/- ##
==========================================
+ Coverage 92.60% 92.94% +0.34%
==========================================
Files 113 115 +2
Lines 9964 10701 +737
Branches 825 863 +38
==========================================
+ Hits 9227 9946 +719
- Misses 560 567 +7
- Partials 177 188 +11 ☔ View full report in Codecov by Sentry. |
remove using during the super stream reset Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
connection pool. Add the clientID to lookup the connection Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
producers and consumers requests. Remove the way to decide the nextid for producers and consumers now it uses the client lists Make the consumer and producer close idempotent. Add EntityStatus to handle the connection status for producers and consumers Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
The issue was raised by @jonnepmyra with a specific use case With the stable implementation, we have:
With this PR:
PoolConfiguration: ConnectionPoolConfig = new ConnectionPoolConfig()
{
ConsumersPerConnection = 20,
ProducersPerConnection = 1,
} @jonnepmyra FYI: By reducing the
PoolConfiguration: ConnectionPoolConfig = new ConnectionPoolConfig()
{
ConsumersPerConnection = 20,
ProducersPerConnection = 1,
} Consumer: var config = new RawConsumerConfig(_streamName)
{
ClientProvidedName = "STREAM-CONSUMER",
OffsetSpec = new OffsetTypeOffset(_offset),
InitialCredits = 1,
MessageHandler = async (consumer, context, arg3) => { tcs.TrySetResult(); }
}; |
Signed-off-by: Gabriele Santomaggio <[email protected]>
Great! I'll aim to test it tomorrow when I have some time. The improvements in my benchmarks seem even more significant in real-world scenarios, beyond the isolated benchmarks. This modification should significantly enhance performance for cases involving time-traveling streams, especially when dealing with a small number of messages. In such scenarios, a substantial portion of time is typically consumed in establishing the connection. Appreciate your contribution, @Gsantomaggio! |
and the values are valid. Add ProducerInfo to better understand the logs. Move dispose function to the abstract entity to remove code duplication. Better logs in case of errors. Signed-off-by: Gabriele Santomaggio <[email protected]>
and the values are valid. Add ProducerInfo to better understand the logs. Move dispose function to the abstract entity to remove code duplication. Better logs in case of errors. Signed-off-by: Gabriele Santomaggio <[email protected]>
The processCheck now waits the subscription is completed before send the credits Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
during the consumers and producers connection Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
I've tested this PR in both our integration test suites and real-workload scenarios, and it's performing really well, especially since 15fe9dc. Notably, the creation of consumers has shown a significant improvement, approximately 4-5 times faster than before. This enhancement is observed as long as the connection pool is not drained of active consumers. A note, even though it falls outside the scope of this PR. Currently, the underlying connection of the "pool" is closed when there are no active consumers, resulting in a slowdown in creating new consumers. It would be beneficial if we could maintain the underlying connection open, even in the absence of consumers. This functionality aligns with our experience with an AMQP connection that can exist without any active channels or consumers, considerably enhancing the efficiency of consumer creation from a time perspective. Thanks for fixing this issue! |
producer and consumer. The configuration still exists but it is not exposed. Signed-off-by: Gabriele Santomaggio <[email protected]>
I am going to merge the PR. |
With this PR #328 the client can handle multi-producers and consumers per connection. This PR removes MetadataHandler and introduces OnMetadataUpdate event. The event can handle multiple Metadata updates coming from the server. Metadata update is raised when a stream is deleted, or a replica is removed. The server automatically removes the producers and consumers linked to the connection, here we need to remove these entities from the internal pool to be consistent. - Refactor RawConsumer and RawProducer. Remove duplication code. Move the common code to the AbstractEntity Class --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
Preface
The RabbitMQ stream protocol supports multi-producers and multi-consumers per TCP Connection.
We designed The .NET stream to have one TCP connection to make it simple.
Pull request
This PR adds the feature without impacting too much on the code. Even some change is required.
We added a new class:
ConnectionsPool
.The pool is a Dictionary with
client_id
and the connection_info.The
client_id
is an internal field to identify the connection uniquely.The
connection_info
contains info like theActiveIds
used for that connection. The values of the IDs don't matter; only the count.For example, if a connection has producer ids: 10,11,45, the
ActiveIds
is 3How does it work?
Suppose I want to have max two ids for connection:
In this case, the connection is one with two ids, see the image:

Calling
p1.close()
reduces the reference on the pool for that connection from two to oneCalling
p2.close()
reduces the reference on the pool for that connection from one to zero. At this point, the connection is closed.Inside the
ConnectionsPoolTests
, you can find the tests with comments for each use case.How to test
I am using this script where I can tune the parameters:
I used
make rabbitmq-ha-proxy
from the golang repo to get a cluster up and runningNote:
We turned off the test
TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted
because it requires changingAction<MetaDataUpdate> MetadataHandler
to anEvent
to support multiple handlers. I would prefer to have it in another PR.