-
Notifications
You must be signed in to change notification settings - Fork 605
Fix consumer loop #772
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consumer loop #772
Conversation
@@ -45,7 +45,7 @@ class WorkPool | |||
readonly ModelBase _model; | |||
CancellationTokenRegistration _tokenRegistration; | |||
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | |||
private Task _task; | |||
private Task _worker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be kept in case we want to gracefully shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No matter what I try, I get test failures on this branch which are not present in master.
With a running local RabbitMQ node under the same umbrella, I run
dotnet test projects/client/Unit --filter "FullyQualifiedName~RabbitMQ.Client.Unit.TestConsumer"
and the failures are:
X TestConsumerCancelEvent [5s 126ms]
Error Message:
Expected: True
But was: False
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerCancelNotify.TestConsumerCancel(String queue, Boolean EventMode, Boolean& notified) in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs:line 108
at RabbitMQ.Client.Unit.TestConsumerCancelNotify.TestConsumerCancelEvent() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs:line 67
X TestConsumerCancelNotification [5s 28ms]
Error Message:
Expected: True
But was: False
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerCancelNotify.TestConsumerCancel(String queue, Boolean EventMode, Boolean& notified) in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs:line 108
at RabbitMQ.Client.Unit.TestConsumerCancelNotify.TestConsumerCancelNotification() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs:line 61
X TestCorrectConsumerTag [5s 33ms]
Error Message:
Expected: "amq.ctag-kf7-j7-mTrVQ6jfi4ocoGw"
But was: null
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerCancelNotify.TestCorrectConsumerTag() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerCancelNotify.cs:line 95
X TestCancelNotificationExceptionHandling [5s 20ms]
Error Message:
Expected: True
But was: False
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestExceptionHandlingWith(IBasicConsumer consumer, Action`4 action) in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 135
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestCancelNotificationExceptionHandling() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 142
X TestConsumerCancelOkExceptionHandling [5s 23ms]
Error Message:
Expected: True
But was: False
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestExceptionHandlingWith(IBasicConsumer consumer, Action`4 action) in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 135
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestConsumerCancelOkExceptionHandling() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 149
X TestConsumerConsumeOkExceptionHandling [5s 16ms]
Error Message:
Expected: True
But was: False
Stack Trace:
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestExceptionHandlingWith(IBasicConsumer consumer, Action`4 action) in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 135
at RabbitMQ.Client.Unit.TestConsumerExceptions.TestConsumerConsumeOkExceptionHandling() in /Users/antares/Development/RabbitMQ/umbrella.git/deps/rabbitmq_dotnet_client/projects/client/Unit/src/unit/TestConsumerExceptions.cs:line 156
I really need to crash. I split out the async changes into #775 Let's see what gives. I don't have more time for today given that it is really bed time here |
@danielmarbach sure, thank you for your help as always ❤️ |
1383415
to
9e1ce33
Compare
@michaelklishin I know what's causing that. Testing a fix now then I'll push to this branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rebased this on master
and addressed one small bug. I think it's good to go ... @michaelklishin ?
@@ -77,7 +77,7 @@ async Task Loop() | |||
// Swallowing the task cancellation exception for the semaphore in case we are stopping. | |||
} | |||
|
|||
while (_tokenSource.IsCancellationRequested && _actions.TryDequeue(out Action action)) | |||
while (_tokenSource.IsCancellationRequested == false && _actions.TryDequeue(out Action action)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. That's what happens when I code late hours...
Awesome to get these reviews/fixes from you @danielmarbach. So many nuances that can be forgotten in a big change like this. Thanks :) |
Fix consumer loop (cherry picked from commit 0edb5ed)
Replaces #771 by restoring a safer consumer work pool shutdown behavior where it would dequeue and process all pending operations before shutting down.