Skip to content

Commit 599b675

Browse files
author
Bogdan Rancichi
committed
[README] updated readme for batch consumer
1 parent 58617ac commit 599b675

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

README.md

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,100 @@ $ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher
755755

756756
The only new option compared to the commands that we have seen before is the one that specifies the __routing key__: `-r '#.error'`.
757757

758+
### Batch Consumers ###
759+
760+
In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing.
761+
762+
e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one.
763+
764+
Define a callback service that implements `BatchConsumerInterface` and add the definition of the consumer to your configuration.
765+
766+
```yaml
767+
batch_consumers:
768+
batch_basic_consumer:
769+
connection: default
770+
exchange_options: {name: 'batch', type: fanout}
771+
queue_options: {name: 'batch'}
772+
callback: batch.basic
773+
qos_options: {prefetch_size: 0, prefetch_count: 2, global: false}
774+
timeout_wait: 5
775+
auto_setup_fabric: false
776+
idle_timeout_exit_code: -2
777+
```
778+
779+
You can implement a batch consumer that will acknoledge all messages in one return or you can have control on what message to acknoledge.
780+
781+
```php
782+
namespace AppBundle\Service;
783+
784+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
785+
use PhpAmqpLib\Message\AMQPMessage;
786+
787+
class DevckBasicConsumer implements BatchConsumerInterface
788+
{
789+
/**
790+
* @inheritDoc
791+
*/
792+
public function batchExecute(array $messages)
793+
{
794+
echo sprintf('Doing batch execution%s', PHP_EOL);
795+
foreach ($messages as $message) {
796+
$this->executeSomeLogicPerMessage($message);
797+
}
798+
799+
// you ack all messages got in batch
800+
return true;
801+
}
802+
}
803+
804+
```
805+
namespace AppBundle\Service;
806+
807+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
808+
use PhpAmqpLib\Message\AMQPMessage;
809+
810+
class DevckBasicConsumer implements BatchConsumerInterface
811+
{
812+
/**
813+
* @inheritDoc
814+
*/
815+
public function batchExecute(array $messages)
816+
{
817+
echo sprintf('Doing batch execution%s', PHP_EOL);
818+
$result = [];
819+
/** @var AMQPMessage $message */
820+
foreach ($messages as $message) {
821+
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
822+
}
823+
824+
// you ack only some messages that have return true
825+
// e.g:
826+
// $return = [
827+
// 1 => true,
828+
// 2 => true,
829+
// 3 => false,
830+
// 4 => true,
831+
// 5 => -1,
832+
// 6 => 2,
833+
// ];
834+
// The following will happen:
835+
// * ack: 1,2,4
836+
// * reject and requeq: 3
837+
// * nack and requeue: 6
838+
// * reject and drop: 5
839+
return $result;
840+
}
841+
}
842+
```
843+
844+
How to run the following batch consumer:
845+
846+
```bash
847+
$ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w
848+
```
849+
850+
Important: BatchConsumers will not have the -m|messages option available
851+
758852
### STDIN Producer ###
759853

760854
There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a `producer` service in your configuration file like this:

0 commit comments

Comments
 (0)