Skip to content

[Feature] Bulk Consumer #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jun 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
04404f9
updated doc comments for method
Dec 7, 2016
27b01a4
added schema onfiguration for batch_consumer
Dec 7, 2016
39951b1
send the configuration name from the extension
Dec 7, 2016
c5b332e
set the configuration name & load batch consumers build
Dec 7, 2016
2c173a7
added batch_consumer class definition
Dec 7, 2016
1d59ce9
initial batch consumer class implementation
Dec 7, 2016
80392e1
complete the build of batch consumers from extensions
Dec 7, 2016
e256558
added queue configuration key for configuration
Dec 7, 2016
16d2ec8
keep track of messages and fix implementation for isCompleteBatch
Dec 7, 2016
1cf94dc
created separate command for batch consumers
Dec 7, 2016
1381d17
created new interface to signal that a callback will have batch proce…
Dec 7, 2016
d00bd48
keep only channel & delivery tag on message
Dec 7, 2016
8ee3028
refactored BatchConsumerCommand it can not accept --messages flag bec…
Dec 10, 2016
5521707
refactored the meaning of a batch consumer it now accepts an array of…
Dec 10, 2016
bbbac11
prevent CR comment about new line.
Dec 10, 2016
c103721
batch consumer will have just 1 callback that behaves like a regular …
Dec 10, 2016
db49a24
refactored BatchConsumer implementation to collect all the messages a…
Dec 10, 2016
58617ac
Merge branch 'master' into batch-consumer
devrck May 24, 2017
599b675
[README] updated readme for batch consumer
Jun 6, 2017
938d944
[change-req] implemented change requests
Jun 7, 2017
3e0faa3
[fix] added method call to idletimeout_exit_code
Jun 8, 2017
6046a41
[fixed] implemented suggested code review
Jun 8, 2017
a39a2a0
[fix] remove unused call; remove finally clause; made some methods pr…
Jun 8, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions Command/BatchConsumerCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

namespace OldSound\RabbitMqBundle\Command;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;

final class BatchConsumerCommand extends BaseRabbitMqCommand
{
/**
* @var BatchConsumer
*/
protected $consumer;

public function stopConsumer()
{
if ($this->consumer instanceof BatchConsumer) {
// Process current message, then halt consumer
$this->consumer->forceStopConsumer();

// Halt consumer if waiting for a new message from the queue
try {
$this->consumer->stopConsuming();
} catch (AMQPTimeoutException $e) {}
}
}

protected function configure()
{
parent::configure();

$this
->setName('rabbitmq:batch:consumer')
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
->setDescription('Executes a Batch Consumer');
;
}

/**
* Executes the current command.
*
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
*
* @return integer 0 if everything went fine, or an error code
*
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
}

if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
if (!function_exists('pcntl_signal')) {
throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
}

pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
}

if (defined('AMQP_DEBUG') === false) {
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
}

$this->initConsumer($input);

return $this->consumer->consume();
}

/**
* @param InputInterface $input
*/
protected function initConsumer(InputInterface $input)
{
$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));

if (null !== $input->getOption('memory-limit') &&
ctype_digit((string) $input->getOption('memory-limit')) &&
$input->getOption('memory-limit') > 0
) {
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
}
$this->consumer->setRoutingKey($input->getOption('route'));
}

/**
* @return string
*/
protected function getConsumerService()
{
return 'old_sound_rabbit_mq.%s_batch';
}
}
55 changes: 54 additions & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,26 @@
*/
class Configuration implements ConfigurationInterface
{
/**
* @var string
*/
protected $name;

/**
* Configuration constructor.
*
* @param string $name
*/
public function __construct($name)
{
$this->name = $name;
}

public function getConfigTreeBuilder()
{
$tree = new TreeBuilder();

$rootNode = $tree->root('old_sound_rabbit_mq');
$rootNode = $tree->root($this->name);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? We don't plan to change the configuration name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make use of already defined alias.


$rootNode
->children()
Expand All @@ -33,6 +48,7 @@ public function getConfigTreeBuilder()
$this->addConsumers($rootNode);
$this->addMultipleConsumers($rootNode);
$this->addDynamicConsumers($rootNode);
$this->addBatchConsumers($rootNode);
$this->addAnonConsumers($rootNode);
$this->addRpcClients($rootNode);
$this->addRpcServers($rootNode);
Expand Down Expand Up @@ -227,6 +243,43 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
;
}

/**
* @param ArrayNodeDefinition $node
*
* @return void
*/
protected function addBatchConsumers(ArrayNodeDefinition $node)
{
$node
->children()
->arrayNode('batch_consumers')
->canBeUnset()
->useAttributeAsKey('key')
->prototype('array')
->append($this->getExchangeConfiguration())
->append($this->getQueueConfiguration())
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('timeout_wait')->defaultValue(3)->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('qos_options')
->children()
->scalarNode('prefetch_size')->defaultValue(0)->end()
->scalarNode('prefetch_count')->defaultValue(2)->end()
->booleanNode('global')->defaultFalse()->end()
->end()
->end()
->scalarNode('enable_logger')->defaultFalse()->end()
->end()
->end()
->end()
->end()
;
}

protected function addAnonConsumers(ArrayNodeDefinition $node)
{
$node
Expand Down
52 changes: 51 additions & 1 deletion DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public function load(array $configs, ContainerBuilder $container)
$loader = new XmlFileLoader($this->container, new FileLocator(array(__DIR__ . '/../Resources/config')));
$loader->load('rabbitmq.xml');

$configuration = new Configuration();
$configuration = new Configuration($this->getAlias());
$this->config = $this->processConfiguration($configuration, $configs);

$this->collectorEnabled = $this->config['enable_collector'];
Expand All @@ -52,6 +52,7 @@ public function load(array $configs, ContainerBuilder $container)
$this->loadConsumers();
$this->loadMultipleConsumers();
$this->loadDynamicConsumers();
$this->loadBatchConsumers();
$this->loadAnonConsumers();
$this->loadRpcClients();
$this->loadRpcServers();
Expand Down Expand Up @@ -358,6 +359,55 @@ protected function loadDynamicConsumers()
}
}

protected function loadBatchConsumers()
{
foreach ($this->config['batch_consumers'] as $key => $consumer) {
$definition = new Definition('%old_sound_rabbit_mq.batch_consumer.class%');

if (!isset($consumer['exchange_options'])) {
$consumer['exchange_options'] = $this->getDefaultExchangeOptions();
}

$definition
->addTag('old_sound_rabbit_mq.base_amqp')
->addTag('old_sound_rabbit_mq.batch_consumer')
->addMethodCall('setTimeoutWait', array($consumer['timeout_wait']))
->addMethodCall('setPrefetchCount', array($consumer['qos_options']['prefetch_count']))
->addMethodCall('setCallback', array(array(new Reference($consumer['callback']), 'batchExecute')))
->addMethodCall('setExchangeOptions', array($this->normalizeArgumentKeys($consumer['exchange_options'])))
->addMethodCall('setQueueOptions', array($this->normalizeArgumentKeys($consumer['queue_options'])))
->addMethodCall('setQosOptions', array(
$consumer['qos_options']['prefetch_size'],
$consumer['qos_options']['prefetch_count'],
$consumer['qos_options']['global']
))
;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method call for setIdleTimeoutExitCode (key: idle_timeout_exit_code) is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}

if (isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}

if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}

$this->injectConnection($definition, $consumer['connection']);
if ($this->collectorEnabled) {
$this->injectLoggedChannel($definition, $key, $consumer['connection']);
}

if ($consumer['enable_logger']) {
$this->injectLogger($definition);
}

$this->container->setDefinition(sprintf('old_sound_rabbit_mq.%s_batch', $key), $definition);
}
}

protected function loadAnonConsumers()
{
foreach ($this->config['anon_consumers'] as $key => $anon) {
Expand Down
95 changes: 95 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,101 @@ $ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher

The only new option compared to the commands that we have seen before is the one that specifies the __routing key__: `-r '#.error'`.

### Batch Consumers ###

In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing.

e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one.

Define a callback service that implements `BatchConsumerInterface` and add the definition of the consumer to your configuration.

```yaml
batch_consumers:
batch_basic_consumer:
connection: default
exchange_options: {name: 'batch', type: fanout}
queue_options: {name: 'batch'}
callback: batch.basic
qos_options: {prefetch_size: 0, prefetch_count: 2, global: false}
timeout_wait: 5
auto_setup_fabric: false
idle_timeout_exit_code: -2
```

You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge.

```php
namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
foreach ($messages as $message) {
$this->executeSomeLogicPerMessage($message);
}

// you ack all messages got in batch
return true;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing close tag: "```"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

```

```php
namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
/**
* @inheritDoc
*/
public function batchExecute(array $messages)
{
echo sprintf('Doing batch execution%s', PHP_EOL);
$result = [];
/** @var AMQPMessage $message */
foreach ($messages as $message) {
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
}

// you ack only some messages that have return true
// e.g:
// $return = [
// 1 => true,
// 2 => true,
// 3 => false,
// 4 => true,
// 5 => -1,
// 6 => 2,
// ];
// The following will happen:
// * ack: 1,2,4
// * reject and requeq: 3
// * nack and requeue: 6
// * reject and drop: 5
return $result;
}
}
```

How to run the following batch consumer:

```bash
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
```

Important: BatchConsumers will not have the -m|messages option available

### STDIN Producer ###

There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a `producer` service in your configuration file like this:
Expand Down
2 changes: 0 additions & 2 deletions RabbitMq/BaseConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ protected function maybeStopConsumer()

if ($this->forceStop || ($this->consumed == $this->target && $this->target > 0)) {
$this->stopConsuming();
} else {
return;
}
}

Expand Down
Loading