-
Notifications
You must be signed in to change notification settings - Fork 466
[Feature] Bulk Consumer #413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
04404f9
27b01a4
39951b1
c5b332e
2c173a7
1d59ce9
80392e1
e256558
16d2ec8
1cf94dc
1381d17
d00bd48
8ee3028
5521707
bbbac11
c103721
db49a24
58617ac
599b675
938d944
3e0faa3
6046a41
a39a2a0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
<?php | ||
|
||
namespace OldSound\RabbitMqBundle\Command; | ||
|
||
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer; | ||
use PhpAmqpLib\Exception\AMQPTimeoutException; | ||
use Symfony\Component\Console\Input\InputArgument; | ||
use Symfony\Component\Console\Input\InputInterface; | ||
use Symfony\Component\Console\Input\InputOption; | ||
use Symfony\Component\Console\Output\OutputInterface; | ||
|
||
final class BatchConsumerCommand extends BaseRabbitMqCommand | ||
{ | ||
/** | ||
* @var BatchConsumer | ||
*/ | ||
protected $consumer; | ||
|
||
public function stopConsumer() | ||
{ | ||
if ($this->consumer instanceof BatchConsumer) { | ||
// Process current message, then halt consumer | ||
$this->consumer->forceStopConsumer(); | ||
|
||
// Halt consumer if waiting for a new message from the queue | ||
try { | ||
$this->consumer->stopConsuming(); | ||
} catch (AMQPTimeoutException $e) {} | ||
} | ||
} | ||
|
||
protected function configure() | ||
{ | ||
parent::configure(); | ||
|
||
$this | ||
->setName('rabbitmq:batch:consumer') | ||
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') | ||
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') | ||
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) | ||
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') | ||
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') | ||
->setDescription('Executes a Batch Consumer'); | ||
; | ||
} | ||
|
||
/** | ||
* Executes the current command. | ||
* | ||
* @param InputInterface $input An InputInterface instance | ||
* @param OutputInterface $output An OutputInterface instance | ||
* | ||
* @return integer 0 if everything went fine, or an error code | ||
* | ||
* @throws \InvalidArgumentException When the number of messages to consume is less than 0 | ||
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true | ||
*/ | ||
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
if (defined('AMQP_WITHOUT_SIGNALS') === false) { | ||
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); | ||
} | ||
|
||
if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) { | ||
if (!function_exists('pcntl_signal')) { | ||
throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called."); | ||
} | ||
|
||
pcntl_signal(SIGTERM, array(&$this, 'stopConsumer')); | ||
pcntl_signal(SIGINT, array(&$this, 'stopConsumer')); | ||
} | ||
|
||
if (defined('AMQP_DEBUG') === false) { | ||
define('AMQP_DEBUG', (bool) $input->getOption('debug')); | ||
} | ||
|
||
$this->initConsumer($input); | ||
|
||
return $this->consumer->consume(); | ||
} | ||
|
||
/** | ||
* @param InputInterface $input | ||
*/ | ||
protected function initConsumer(InputInterface $input) | ||
{ | ||
$this->consumer = $this->getContainer() | ||
->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); | ||
|
||
if (null !== $input->getOption('memory-limit') && | ||
ctype_digit((string) $input->getOption('memory-limit')) && | ||
$input->getOption('memory-limit') > 0 | ||
) { | ||
$this->consumer->setMemoryLimit($input->getOption('memory-limit')); | ||
} | ||
$this->consumer->setRoutingKey($input->getOption('route')); | ||
} | ||
|
||
/** | ||
* @return string | ||
*/ | ||
protected function getConsumerService() | ||
{ | ||
return 'old_sound_rabbit_mq.%s_batch'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,7 +41,7 @@ public function load(array $configs, ContainerBuilder $container) | |
$loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config'))); | ||
$loader->load('rabbitmq.xml'); | ||
|
||
$configuration = new Configuration(); | ||
$configuration = new Configuration($this->getAlias()); | ||
$this->config = $this->processConfiguration($configuration, $configs); | ||
|
||
$this->collectorEnabled = $this->config['enable_collector']; | ||
|
@@ -52,6 +52,7 @@ public function load(array $configs, ContainerBuilder $container) | |
$this->loadConsumers(); | ||
$this->loadMultipleConsumers(); | ||
$this->loadDynamicConsumers(); | ||
$this->loadBatchConsumers(); | ||
$this->loadAnonConsumers(); | ||
$this->loadRpcClients(); | ||
$this->loadRpcServers(); | ||
|
@@ -358,6 +359,55 @@ protected function loadDynamicConsumers() | |
} | ||
} | ||
|
||
protected function loadBatchConsumers() | ||
{ | ||
foreach ($this->config['batch_consumers'] as $key => $consumer) { | ||
$definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%'); | ||
|
||
if (!isset($consumer['exchange_options'])) { | ||
$consumer['exchange_options'] = $this->getDefaultExchangeOptions(); | ||
} | ||
|
||
$definition | ||
->addTag('old_sound_rabbit_mq.base_amqp') | ||
->addTag('old_sound_rabbit_mq.batch_consumer') | ||
->addMethodCall('setTimeoutWait', array($consumer['timeout_wait'])) | ||
->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count'])) | ||
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute'))) | ||
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options']))) | ||
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options']))) | ||
->addMethodCall('setQosOptions', array( | ||
$consumer['qos_options']['prefetch_size'], | ||
$consumer['qos_options']['prefetch_count'], | ||
$consumer['qos_options']['global'] | ||
)) | ||
; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. method call for setIdleTimeoutExitCode (key: idle_timeout_exit_code) is missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
if (isset($consumer['idle_timeout_exit_code'])) { | ||
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code'])); | ||
} | ||
|
||
if (isset($consumer['idle_timeout'])) { | ||
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout'])); | ||
} | ||
|
||
if (!$consumer['auto_setup_fabric']) { | ||
$definition->addMethodCall('disableAutoSetupFabric'); | ||
} | ||
|
||
$this->injectConnection($definition, $consumer['connection']); | ||
if ($this->collectorEnabled) { | ||
$this->injectLoggedChannel($definition, $key, $consumer['connection']); | ||
} | ||
|
||
if ($consumer['enable_logger']) { | ||
$this->injectLogger($definition); | ||
} | ||
|
||
$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition); | ||
} | ||
} | ||
|
||
protected function loadAnonConsumers() | ||
{ | ||
foreach ($this->config['anon_consumers'] as $key => $anon) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -755,6 +755,101 @@ $ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher | |
|
||
The only new option compared to the commands that we have seen before is the one that specifies the __routing key__: `-r '#.error'`. | ||
|
||
### Batch Consumers ### | ||
|
||
In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing. | ||
|
||
e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one. | ||
|
||
Define a callback service that implements `BatchConsumerInterface` and add the definition of the consumer to your configuration. | ||
|
||
```yaml | ||
batch_consumers: | ||
batch_basic_consumer: | ||
connection: default | ||
exchange_options: {name: 'batch', type: fanout} | ||
queue_options: {name: 'batch'} | ||
callback: batch.basic | ||
qos_options: {prefetch_size: 0, prefetch_count: 2, global: false} | ||
timeout_wait: 5 | ||
auto_setup_fabric: false | ||
idle_timeout_exit_code: -2 | ||
``` | ||
|
||
You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge. | ||
|
||
```php | ||
namespace AppBundle\Service; | ||
|
||
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
|
||
class DevckBasicConsumer implements BatchConsumerInterface | ||
{ | ||
/** | ||
* @inheritDoc | ||
*/ | ||
public function batchExecute(array $messages) | ||
{ | ||
echo sprintf('Doing batch execution%s', PHP_EOL); | ||
foreach ($messages as $message) { | ||
$this->executeSomeLogicPerMessage($message); | ||
} | ||
|
||
// you ack all messages got in batch | ||
return true; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing close tag: "```" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed. |
||
``` | ||
|
||
```php | ||
namespace AppBundle\Service; | ||
|
||
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface; | ||
use PhpAmqpLib\Message\AMQPMessage; | ||
|
||
class DevckBasicConsumer implements BatchConsumerInterface | ||
{ | ||
/** | ||
* @inheritDoc | ||
*/ | ||
public function batchExecute(array $messages) | ||
{ | ||
echo sprintf('Doing batch execution%s', PHP_EOL); | ||
$result = []; | ||
/** @var AMQPMessage $message */ | ||
foreach ($messages as $message) { | ||
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message); | ||
} | ||
|
||
// you ack only some messages that have return true | ||
// e.g: | ||
// $return = [ | ||
// 1 => true, | ||
// 2 => true, | ||
// 3 => false, | ||
// 4 => true, | ||
// 5 => -1, | ||
// 6 => 2, | ||
// ]; | ||
// The following will happen: | ||
// * ack: 1,2,4 | ||
// * reject and requeq: 3 | ||
// * nack and requeue: 6 | ||
// * reject and drop: 5 | ||
return $result; | ||
} | ||
} | ||
``` | ||
|
||
How to run the following batch consumer: | ||
|
||
```bash | ||
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w | ||
``` | ||
|
||
Important: BatchConsumers will not have the -m|messages option available | ||
|
||
### STDIN Producer ### | ||
|
||
There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a `producer` service in your configuration file like this: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? We don't plan to change the configuration name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make use of already defined alias.