Skip to content

Commit 8ee3028

Browse files
author
Bogdan Rancichi
committed
refactored BatchConsumerCommand it can not accept --messages flag because it does the work in batches, could be added later on maybe, but with other meaning (how many batches not how many messages)
1 parent d00bd48 commit 8ee3028

File tree

1 file changed

+96
-3
lines changed

1 file changed

+96
-3
lines changed

Command/BatchConsumerCommand.php

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,110 @@
22

33
namespace OldSound\RabbitMqBundle\Command;
44

5-
class BatchConsumerCommand extends BaseConsumerCommand
5+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6+
use PhpAmqpLib\Exception\AMQPTimeoutException;
7+
use Symfony\Component\Console\Input\InputArgument;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
class BatchConsumerCommand extends BaseRabbitMqCommand
613
{
14+
/**
15+
* @var BatchConsumer
16+
*/
17+
protected $consumer;
18+
19+
/**
20+
* @return void
21+
*/
22+
public function stopConsumer()
23+
{
24+
if ($this->consumer instanceof BatchConsumer) {
25+
// Process current message, then halt consumer
26+
$this->consumer->forceStopConsumer();
27+
28+
// Halt consumer if waiting for a new message from the queue
29+
try {
30+
$this->consumer->stopConsuming();
31+
} catch (AMQPTimeoutException $e) {}
32+
}
33+
}
34+
35+
public function restartConsumer()
36+
{
37+
// TODO: Implement restarting of consumer
38+
}
39+
40+
741
protected function configure()
842
{
943
parent::configure();
44+
1045
$this
11-
->setName('rabbitmq:batch-consumer')
12-
->setDescription('Executes a batch consumer')
46+
->setName('rabbitmq:batch:consumer')
47+
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
48+
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
49+
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
50+
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
51+
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
52+
->setDescription('Executes a Batch Consumer');
1353
;
1454
}
1555

56+
/**
57+
* Executes the current command.
58+
*
59+
* @param InputInterface $input An InputInterface instance
60+
* @param OutputInterface $output An OutputInterface instance
61+
*
62+
* @return integer 0 if everything went fine, or an error code
63+
*
64+
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
65+
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
66+
*/
67+
protected function execute(InputInterface $input, OutputInterface $output)
68+
{
69+
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
70+
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
71+
}
72+
73+
if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
74+
if (!function_exists('pcntl_signal')) {
75+
throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
76+
}
77+
78+
pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
79+
pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
80+
// pcntl_signal(SIGHUP, array(&$this, 'restartConsumer'));
81+
}
82+
83+
if (defined('AMQP_DEBUG') === false) {
84+
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
85+
}
86+
87+
$this->initConsumer($input);
88+
89+
return $this->consumer->consume();
90+
}
91+
92+
/**
93+
* @param InputInterface $input
94+
*/
95+
protected function initConsumer(InputInterface $input)
96+
{
97+
$this->consumer = $this->getContainer()
98+
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
99+
100+
if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
101+
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
102+
}
103+
$this->consumer->setRoutingKey($input->getOption('route'));
104+
}
105+
106+
/**
107+
* @return string
108+
*/
16109
protected function getConsumerService()
17110
{
18111
return 'old_sound_rabbit_mq.%s_batch';

0 commit comments

Comments
 (0)