Skip to content

Handle cancellation token during the consumer close #339

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 22, 2023

Conversation

Gsantomaggio
Copy link
Member

  • During the handle delivery, the consumer could receive a Token cancellation
    In this commit, the consumer handles it with a log and exit. It will avoid
    propagating the error and close the TCP connection

  • Add a lock around the IsOpen() function to make it thread-safe.
    In normal situations, it does not matter. It is useful when a consumer is
    created and destroyed in a short time

  • Handle the Subscribe error. In case there is an error during the init.
    The error will be raised to the caller, but the pool must be consistent.

Found the issue with a caos test like:

for (var z = 0; z < 800; z++)
        {
            Console.WriteLine($"Starting consumer test {z}");
            await Task.Delay(new Random().Next(100, 200));
            try
            {
                var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);

                var consumer = await _streamSystem.CreateConsumer($"consumer-thread-force-test_{z}",
                    async (_, consumer, arg3, arg4) =>
                    {
                        tcs.TrySetResult();
                        await Task.CompletedTask;
                    }
                        
                );

                await tcs.Task.ConfigureAwait(false);
                _ = Task.Run(async () =>
                {
                    await Task.Delay(new Random().Next(500, 800));
                    await consumer.Close();
                });
            }
            catch (Exception e)
            {
                Console.WriteLine($"Error creating consumer {e.Message}");
            }
        }

Signed-off-by: Gabriele Santomaggio <[email protected]>
* during the handle deliver the consumer could receive a Token cancellation
  In this commit the consumer handle it with a log and exit. It will avoid
  to propagate the error and close the TCP connection

* Add a lock around the IsOpen() function to make it thread-safe.
  In normal situations in does not matter. It is useful when a consumer is
  created and destroyed in a short time

* Handle the Subscribe error. In case there is an error during the init.
  The error will be raised to the caller but the pool must be consistent

Signed-off-by: Gabriele Santomaggio <[email protected]>
Signed-off-by: Gabriele Santomaggio <[email protected]>
Copy link

codecov bot commented Dec 21, 2023

Codecov Report

Attention: 91 lines in your changes are missing coverage. Please review.

Comparison is base (4601429) 92.49% compared to head (ae0ea1c) 92.76%.
Report is 4 commits behind head on main.

❗ Current head ae0ea1c differs from pull request most recent head 9edd363. Consider uploading reports for the commit 9edd363 to get more accurate results

Files Patch % Lines
RabbitMQ.Stream.Client/RawConsumer.cs 81.94% 22 Missing and 4 partials ⚠️
RabbitMQ.Stream.Client/Client.cs 87.02% 7 Missing and 10 partials ⚠️
RabbitMQ.Stream.Client/Reliable/Producer.cs 56.66% 9 Missing and 4 partials ⚠️
RabbitMQ.Stream.Client/AbstractEntity.cs 78.18% 9 Missing and 3 partials ⚠️
RabbitMQ.Stream.Client/ConnectionsPool.cs 90.67% 1 Missing and 10 partials ⚠️
RabbitMQ.Stream.Client/ClientExceptions.cs 60.00% 5 Missing and 1 partial ⚠️
RabbitMQ.Stream.Client/RawProducer.cs 97.01% 2 Missing ⚠️
RabbitMQ.Stream.Client/StreamSystem.cs 90.47% 0 Missing and 2 partials ⚠️
RabbitMQ.Stream.Client/MetaData.cs 66.66% 0 Missing and 1 partial ⚠️
Tests/SuperStreamConsumerTests.cs 95.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #339      +/-   ##
==========================================
+ Coverage   92.49%   92.76%   +0.26%     
==========================================
  Files         113      116       +3     
  Lines        9946    10994    +1048     
  Branches      825      901      +76     
==========================================
+ Hits         9200    10199     +999     
- Misses        567      600      +33     
- Partials      179      195      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Gsantomaggio Gsantomaggio marked this pull request as ready for review December 21, 2023 14:39
on the close and dispose call. If a disposed is called just after the creation
it waits.

* RawConsumer: Add check if the consumer is open before dispach
* Client: Add nextEntityId to have always a sequential ids. It avoids to use the same
  ids during the recycle
* ConnectionPool.FindNextValidId/2:Change the way to give the ids. Given an `nextid` in input
  the function will give always the next value

Signed-off-by: Gabriele Santomaggio <[email protected]>
@Gsantomaggio Gsantomaggio merged commit 6ff17c8 into main Dec 22, 2023
@Gsantomaggio Gsantomaggio deleted the rare_condition_closing_consumer branch December 22, 2023 15:12
@Gsantomaggio Gsantomaggio mentioned this pull request Dec 22, 2023
6 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant