Skip to content

Multiple Producers and Consumers per connection #328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Dec 11, 2023
Merged

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Nov 20, 2023

Preface

The RabbitMQ stream protocol supports multi-producers and multi-consumers per TCP Connection.
We designed The .NET stream to have one TCP connection to make it simple.

Pull request

This PR adds the feature without impacting too much on the code. Even some change is required.

We added a new class: ConnectionsPool.
The pool is a Dictionary with client_id and the connection_info.

The client_id is an internal field to identify the connection uniquely.

The connection_info contains info like the ActiveIds used for that connection. The values of the IDs don't matter; only the count.

For example, if a connection has producer ids: 10,11,45, the ActiveIds is 3

How does it work?

Suppose I want to have max two ids for connection:

var pool = new ConnectionsPool(0, 2);
var metaDataInfo = await client.QueryMetadata(new[] { Stream1, Stream2 });
var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
    metaDataInfo.StreamInfos[Stream1]);

var p2 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream2) { Pool = pool },metaDataInfo.StreamInfos[Stream2]);

In this case, the connection is one with two ids, see the image:
img_1

Calling p1.close() reduces the reference on the pool for that connection from two to one

Calling p2.close() reduces the reference on the pool for that connection from one to zero. At this point, the connection is closed.

Inside the ConnectionsPoolTests, you can find the tests with comments for each use case.

How to test

I am using this script where I can tune the parameters:

using System.Net;
using System.Runtime.InteropServices.ComTypes;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using RabbitMQ.Stream.Client.Reliable;

namespace example;

public class Pool
{
    public static async Task Start()
    {
        var serviceCollection = new ServiceCollection();
        serviceCollection.AddLogging(builder => builder
            .AddSimpleConsole(options =>
            {
                options.IncludeScopes = true;
                options.SingleLine = true;
                options.TimestampFormat = "[HH:mm:ss] ";
                options.ColorBehavior = LoggerColorBehavior.Default;
            })
            .AddFilter(level => level >= LogLevel.Information)
        );
        var loggerFactory = serviceCollection.BuildServiceProvider()
            .GetService<ILoggerFactory>();

        var lp = loggerFactory.CreateLogger<Producer>();
        var lc = loggerFactory.CreateLogger<Consumer>();
        const int streams = 10;
        const int producers = 20;
        const byte producersPerConnection = 12;
        const int messagesPerProducer = 5000;

        const int consumers = 20;
        const byte consumersPerConnection = 13;

        var system = await StreamSystem.Create(new StreamSystemConfig()
        {
            Endpoints = new List<EndPoint>()
            {
                new DnsEndPoint("node1", 5572),
                new DnsEndPoint("node0", 5562),
                new DnsEndPoint("node2", 5582),
            },

            ConnectionPoolConfig = new ConnectionPoolConfig()
            {
                ProducersPerConnection = producersPerConnection,
                ConsumersPerConnection = consumersPerConnection,
            }
        });

        var streamsList = new List<string>();
        for (var i = 0; i < streams; i++)
        {
            streamsList.Add($"BenchmarkDotNet{i}");
        }

        var totalConfirmed = 0;
        var totalError = 0;
        var totalConsumed = 0;
        var totalSent = 0;
        var isRunning = true;

        _ = Task.Run(() =>
        {
            while (isRunning)
            {
                Console.WriteLine(
                    $"Conf: {totalConfirmed}, " +
                    $"Error: {totalError}, " +
                    $"Total: {totalConfirmed + totalError}, " +
                    $"Consumed: {totalConsumed}, " +
                    $"Sent: {totalSent}, " +
                    $"Sent per stream: {totalSent / streamsList.Count}");
                Thread.Sleep(1000);
            }
        });
        List<Consumer> consumersList = new();
        List<Producer> producersList = new();
        var obj = new object();
        foreach (var stream in streamsList)
        {
            if (await system.StreamExists(stream))
            {
                await system.DeleteStream(stream);
            }


            await system.CreateStream(new StreamSpec(stream)
            {
                MaxLengthBytes = 20_000_000_000,
            });


            for (var z = 0; z < consumers; z++)
            {
                consumersList.Add(await Consumer.Create(new ConsumerConfig(system, stream)
                {
                    OffsetSpec = new OffsetTypeFirst(),
                    MessageHandler = (source, ctx, _, _) =>
                    {
                        Interlocked.Increment(ref totalConsumed);
                        return Task.CompletedTask;
                    },
                }));
            }


            for (var z = 0; z < producers; z++)
            {
                _ = Task.Run(async () =>
                {
                    var producer = await Producer.Create(new ProducerConfig(system, stream)
                    {
                        ConfirmationHandler = confirmation =>
                        {
                            if (confirmation.Status != ConfirmationStatus.Confirmed)
                            {
                                Interlocked.Increment(ref totalError);
                                return Task.CompletedTask;
                            }

                            Interlocked.Increment(ref totalConfirmed);
                            return Task.CompletedTask;
                        },
                    });
                    lock (obj)
                    {
                        producersList.Add(producer);
                    }

                    for (var i = 0; i < messagesPerProducer; i++)
                    {
                        var m = new Message(Encoding.UTF8.GetBytes($"Hello World! {i}")); // 1 sezione 
                        await producer.Send(m);
                        await Task.Delay(10);
                        Interlocked.Increment(ref totalSent);
                    }
                });
            }
        }

        Console.WriteLine("Press any key to close all the consumers");
        Console.ReadKey();
        isRunning = false;
        Console.WriteLine("closing the producers ..... ");
        producersList.ForEach(async p => await p.Close());
        Console.WriteLine("closing the consumers ..... ");
        consumersList.ForEach(async c => await c.Close());
        Console.WriteLine("Closed all the consumers and producers");
    }
}

I used make rabbitmq-ha-proxy from the golang repo to get a cluster up and running

Note:

We turned off the test TheProducerPoolShouldBeConsistentWhenAStreamIsDeleted because it requires changing Action<MetaDataUpdate> MetadataHandler to an Event to support multiple handlers. I would prefer to have it in another PR.

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
multi consumers per connection

Signed-off-by: Gabriele Santomaggio <[email protected]>
multi consumers per connection

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
handle consumers list

Signed-off-by: Gabriele Santomaggio <[email protected]>
consumer and producer

Signed-off-by: Gabriele Santomaggio <[email protected]>
Copy link

codecov bot commented Nov 21, 2023

Codecov Report

Attention: 34 lines in your changes are missing coverage. Please review.

Comparison is base (90a6752) 92.60% compared to head (c7c66a5) 92.94%.
Report is 2 commits behind head on main.

Files Patch % Lines
RabbitMQ.Stream.Client/AbstractEntity.cs 46.15% 11 Missing and 3 partials ⚠️
RabbitMQ.Stream.Client/ConnectionsPool.cs 91.52% 1 Missing and 9 partials ⚠️
RabbitMQ.Stream.Client/RawConsumer.cs 88.88% 0 Missing and 3 partials ⚠️
RabbitMQ.Stream.Client/Client.cs 97.26% 0 Missing and 2 partials ⚠️
RabbitMQ.Stream.Client/StreamSystem.cs 90.47% 0 Missing and 2 partials ⚠️
RabbitMQ.Stream.Client/MetaData.cs 66.66% 0 Missing and 1 partial ⚠️
RabbitMQ.Stream.Client/RawProducer.cs 95.65% 0 Missing and 1 partial ⚠️
Tests/SuperStreamConsumerTests.cs 95.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #328      +/-   ##
==========================================
+ Coverage   92.60%   92.94%   +0.34%     
==========================================
  Files         113      115       +2     
  Lines        9964    10701     +737     
  Branches      825      863      +38     
==========================================
+ Hits         9227     9946     +719     
- Misses        560      567       +7     
- Partials      177      188      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

remove using during the super stream reset

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
connection pool. Add the clientID to lookup the connection

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
producers and consumers requests.
Remove the way to decide the nextid for producers and consumers now it uses the client lists
Make the consumer and producer close idempotent.
Add EntityStatus to handle the connection status for producers and consumers

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio
Copy link
Member Author

Gsantomaggio commented Nov 28, 2023

The issue was raised by @jonnepmyra with a specific use case

With the stable implementation, we have:

| Method                 | Mean      | Error     | StdDev    | Ratio | RatioSD |
|----------------------- |----------:|----------:|----------:|------:|--------:|
| AmqpStreamConsumer     |  10.39 ms |  1.454 ms |  4.287 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 102.34 ms | 18.856 ms | 55.597 ms | 10.41 |    5.89 ||

107 ms per StreamProtocol 10.39 ms Consumer vs Amqp StreamConsumer

With this PR:

| Method                 | Mean     | Error    | StdDev    | Median   | Ratio | RatioSD |
|----------------------- |---------:|---------:|----------:|---------:|------:|--------:|
| AmqpStreamConsumer     | 12.95 ms | 0.993 ms |  2.928 ms | 12.28 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 28.29 ms | 5.076 ms | 14.967 ms | 24.05 ms |  2.31 |    1.38 |

107 ms per StreamProtocol 28.29 ms Consumer vs 12.95 Amqp StreamConsumer

PoolConfiguration:

ConnectionPoolConfig = new ConnectionPoolConfig()
{
  ConsumersPerConnection = 20,
  ProducersPerConnection = 1,
}

@jonnepmyra FYI: By reducing the InitialCredits to 1 you can still improve:

| Method                 | Mean     | Error    | StdDev   | Ratio | RatioSD |
|----------------------- |---------:|---------:|---------:|------:|--------:|
| AmqpStreamConsumer     | 10.70 ms | 1.334 ms | 3.932 ms |  1.00 |    0.00 |
| StreamProtocolConsumer | 19.60 ms | 3.127 ms | 9.219 ms |  1.95 |    1.07 |

PoolConfiguration:

ConnectionPoolConfig = new ConnectionPoolConfig()
{
  ConsumersPerConnection = 20,
  ProducersPerConnection = 1,
}

Consumer:

var config = new RawConsumerConfig(_streamName)
        {
            ClientProvidedName = "STREAM-CONSUMER",
            OffsetSpec = new OffsetTypeOffset(_offset),
            InitialCredits = 1,
            MessageHandler = async (consumer, context, arg3) => { tcs.TrySetResult(); }
        };

Signed-off-by: Gabriele Santomaggio <[email protected]>
@jonnepmyra
Copy link
Contributor

Great! I'll aim to test it tomorrow when I have some time. The improvements in my benchmarks seem even more significant in real-world scenarios, beyond the isolated benchmarks.

This modification should significantly enhance performance for cases involving time-traveling streams, especially when dealing with a small number of messages. In such scenarios, a substantial portion of time is typically consumed in establishing the connection.

Appreciate your contribution, @Gsantomaggio!

and the values are valid.
Add ProducerInfo to better understand the logs.
Move dispose function to the abstract entity to remove code duplication.
Better logs in case of errors.

Signed-off-by: Gabriele Santomaggio <[email protected]>
and the values are valid.
Add ProducerInfo to better understand the logs.
Move dispose function to the abstract entity to remove code duplication.
Better logs in case of errors.

Signed-off-by: Gabriele Santomaggio <[email protected]>
The processCheck now waits the subscription is completed before send the credits

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@lukebakken lukebakken self-requested a review December 1, 2023 16:29
during the consumers and producers connection

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
@jonnepmyra
Copy link
Contributor

I've tested this PR in both our integration test suites and real-workload scenarios, and it's performing really well, especially since 15fe9dc.

Notably, the creation of consumers has shown a significant improvement, approximately 4-5 times faster than before. This enhancement is observed as long as the connection pool is not drained of active consumers.

A note, even though it falls outside the scope of this PR. Currently, the underlying connection of the "pool" is closed when there are no active consumers, resulting in a slowdown in creating new consumers. It would be beneficial if we could maintain the underlying connection open, even in the absence of consumers. This functionality aligns with our experience with an AMQP connection that can exist without any active channels or consumers, considerably enhancing the efficiency of consumer creation from a time perspective.

Thanks for fixing this issue!

@Gsantomaggio Gsantomaggio changed the title Pool connections Multiple Producers and Consumers per connection Dec 6, 2023
@Gsantomaggio Gsantomaggio marked this pull request as ready for review December 6, 2023 17:33
producer and consumer. The configuration still exists but it is not exposed.

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio
Copy link
Member Author

I am going to merge the PR.
It requires more job to handle the metadata update but I prefer to open another PR

@Gsantomaggio Gsantomaggio merged commit 4cefb84 into main Dec 11, 2023
@Gsantomaggio Gsantomaggio deleted the pool_connections branch December 11, 2023 13:39
Gsantomaggio added a commit that referenced this pull request Dec 18, 2023
With this PR #328 the client can handle multi-producers and consumers per connection.

This PR removes MetadataHandler and introduces OnMetadataUpdate event.

The event can handle multiple Metadata updates coming from the server. Metadata update is raised when a stream is deleted, or a replica is removed.

The server automatically removes the producers and consumers linked to the connection, here we need to remove these entities from the internal pool to be consistent.

- Refactor RawConsumer and RawProducer. Remove duplication code. Move the common code to the AbstractEntity Class

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants