-
Notifications
You must be signed in to change notification settings - Fork 466
[Feature] Bulk Consumer #413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
How I access the messages in |
RabbitMq/BatchConsumerInterface.php
Outdated
interface BatchConsumerInterface extends ConsumerInterface | ||
{ | ||
public function batchExecute(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line :)
RabbitMq/BatchConsumer.php
Outdated
{ | ||
$this->getChannel()->wait(null, false, $timeout); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line :)
{ | ||
return 'old_sound_rabbit_mq.%s_batch'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line :)
Well my premise is that you do work in the execute() part and then in the batchExecute() you will do a action that is very costly. Take by example a batch process for inserting in a database using Doctrine. You do the work in execute(), persisting objects, modifying objects and then in the batchExecute() you will do a flush. After you've asked how do you access the messages I thought you should have access and I can give access to the messages in the batchExecute() modifying its signature to: /**
* @param AMQPMessage[] $messages
*
* @return void
*/
public function batchExecute(array $messages); Q: @skafandri what do you think? PS: I will fix the CR 😄 as soon as possible. |
I ma not sure if I understand the bulk idea in this context well. What I understood is:
|
Yes you've got it. The problem with this is that if your message generates an exception it will be stuck on your queue forever if you don't have logic to get it off. I can implement a mechanism to know if your messages has been passed along for some time but I don't know if I should do this or the developer. Whats your opinion? |
I think about it to be similar to an SQL transaction
As long as
Where Now back to your implementation, if In other words, I expect I wouldn't mind creating an |
Hmm now that I think about my solution is a work around to this |
…ause it does the work in batches, could be added later on maybe, but with other meaning (how many batches not how many messages)
… messages and it can return for each message the response ot ack/nack/requeue/reject or 1 single response applied for all.
…consumer just has an array of messages instead of a single one
…nd then pass it to the callback
Fixed CR & implemented another approach to the bulk consumer. |
Hi guys did anybody got a chance to look over this? |
A consumer can look like this. It basically has the same structure as before but now i pass an array of messages to batchExecute (as @skafandri 👍 pointed out that it should work like In the first example you can just return one bool value that is the same as saying the same value for each message: namespace AppBundle\Service;
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
foreach ($messages as $message) {
$this->executeSomeLogicPerMessage($message);
}
return true;
}
} The second example offers you more control on what to do with each individual message but you have to keep track of the messages and assign for each delivery_tag a value and then return that result back to the batch consumer: namespace AppBundle\Service;
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
$result = [];
/** @var AMQPMessage $message */
foreach ($messages as $message) {
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
}
return $result;
}
} @skafandri doing another class |
Looks good for me. I think we should document some edge cases so users won't get confused. What happens of the batch size is smaller than the prefetch count? |
First of all this is the configuration for a batch consumer (It the same as above): # RabbitMQ Configuration
old_sound_rabbit_mq:
connections:
default:
host: '%rabbit_mq_conn_default_host%'
port: '%rabbit_mq_conn_default_port%'
user: '%rabbit_mq_conn_default_user%'
password: '%rabbit_mq_conn_default_password%'
vhost: '%rabbit_mq_conn_default_vhost%'
lazy: '%rabbit_mq_conn_default_lazy%'
batch_consumers:
devck_basic:
connection: default
exchange_options: {name: '%rabbit_mq_exchange_devck%', type: fanout}
queue_options: {name: '%rabbit_mq_queue_devck%'}
callback: devck.basic
qos_options: {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait: 5
auto_setup_fabric: false
idle_timeout_exit_code: -2 Getting back to your questions: Q: What happens of the batch size is smaller than the prefetch count? Q: What happens if there are less messages available in the queue than the batch size? Q: We should also mention that a batch is not fixed. With a batch of 3, if I nack [m1, m2, m3] there is no guarantee (most likely not) that another consumer will receive the same batch, it may receive [m1, m2, m5] Additional situations: Prefetch with large messages:
|
good job, let's add this to README.md so we can review it better to make sure is accurate, helpful, and fits well with the current documentation. |
Any news? It is a great feature !! Awesome works! We really need it! |
@Nightbr will get back to it to complete the documentation and add revise the code. |
Thank you for this awesome work. This is exactly what we was looking for. |
@skafandri after a long time I've added the README part. |
README.md
Outdated
} | ||
} | ||
|
||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be: "```php"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
// you ack all messages got in batch | ||
return true; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing close tag: "```"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
RabbitMq/BatchConsumer.php
Outdated
|
||
$this->maybeStopConsumer(); | ||
|
||
if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null !==
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
RabbitMq/BatchConsumer.php
Outdated
$this->consumed++; | ||
$this->maybeStopConsumer(); | ||
|
||
if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null !==
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
RabbitMq/BatchConsumer.php
Outdated
/** | ||
* @var int | ||
*/ | ||
protected $consumed = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is not expected to be extended please mark all variables as private
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
public function getConfigTreeBuilder() | ||
{ | ||
$tree = new TreeBuilder(); | ||
|
||
$rootNode = $tree->root('old_sound_rabbit_mq'); | ||
$rootNode = $tree->root($this->name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? We don't plan to change the configuration name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make use of already defined alias.
Command/BatchConsumerCommand.php
Outdated
$this->consumer = $this->getContainer() | ||
->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); | ||
|
||
if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null !==
Command/BatchConsumerCommand.php
Outdated
|
||
public function restartConsumer() | ||
{ | ||
// TODO: Implement restarting of consumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be removed or added before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
Command/BatchConsumerCommand.php
Outdated
protected $consumer; | ||
|
||
/** | ||
* @return void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Such phpdoc is useless IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
Waiting review 😄 |
$consumer['qos_options']['global'] | ||
)) | ||
; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method call for setIdleTimeoutExitCode (key: idle_timeout_exit_code) is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
RabbitMq/BatchConsumer.php
Outdated
$this->batchConsume(); | ||
} | ||
|
||
$timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(
& )
are useless here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
RabbitMq/BatchConsumer.php
Outdated
|
||
public function batchConsume() | ||
{ | ||
if ($this->batchCounter == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0 === $this->batchCounter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented.
RabbitMq/BatchConsumer.php
Outdated
|
||
if ($this->forceStop) { | ||
$this->stopConsuming(); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole } else {
& return;
is useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Command/BatchConsumerCommand.php
Outdated
|
||
pcntl_signal(SIGTERM, array(&$this, 'stopConsumer')); | ||
pcntl_signal(SIGINT, array(&$this, 'stopConsumer')); | ||
// pcntl_signal(SIGHUP, array(&$this, 'restartConsumer')); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like unused.
RabbitMq/BatchConsumer.php
Outdated
) | ||
)); | ||
throw $e; | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot use finally
as we support PHP 5.3+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, fixed.
@stloyd what about this PR? What changes do I have to make? |
Thank you @devrck ! |
#403 implementation
[ Obs ]: Message can get stuck on the queue forever because of the behavior of the @batchExecute() when it has exceptions. I am open for suggestions on how to fix this.
Configuration used in app/config/config.yml
Callback class