Skip to content

Commit ee24876

Browse files
committed
Merge branch 'master' into docs/mention-issue-with-heartbeat-and-long-running-tasks
# Conflicts: # README.md
2 parents 5eebab1 + 74b63c0 commit ee24876

26 files changed

+1446
-26
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2017-01-22
2+
* Add `graceful_max_execution_timeout`
3+
14
- 2015-02-07
25
* Added possibility to set serialize/unserialize function for rpc servers/clients
36

Command/BaseConsumerCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected function configure()
4343
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
4444
->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0)
4545
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
46-
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
46+
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process (MB)', null)
4747
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
4848
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
4949
;

Command/BatchConsumerCommand.php

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Command;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer;
6+
use PhpAmqpLib\Exception\AMQPTimeoutException;
7+
use Symfony\Component\Console\Input\InputArgument;
8+
use Symfony\Component\Console\Input\InputInterface;
9+
use Symfony\Component\Console\Input\InputOption;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
final class BatchConsumerCommand extends BaseRabbitMqCommand
13+
{
14+
/**
15+
* @var BatchConsumer
16+
*/
17+
protected $consumer;
18+
19+
public function stopConsumer()
20+
{
21+
if ($this->consumer instanceof BatchConsumer) {
22+
// Process current message, then halt consumer
23+
$this->consumer->forceStopConsumer();
24+
25+
// Halt consumer if waiting for a new message from the queue
26+
try {
27+
$this->consumer->stopConsuming();
28+
} catch (AMQPTimeoutException $e) {}
29+
}
30+
}
31+
32+
protected function configure()
33+
{
34+
parent::configure();
35+
36+
$this
37+
->setName('rabbitmq:batch:consumer')
38+
->addArgument('name', InputArgument::REQUIRED, 'Consumer Name')
39+
->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '')
40+
->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null)
41+
->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging')
42+
->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals')
43+
->setDescription('Executes a Batch Consumer');
44+
;
45+
}
46+
47+
/**
48+
* Executes the current command.
49+
*
50+
* @param InputInterface $input An InputInterface instance
51+
* @param OutputInterface $output An OutputInterface instance
52+
*
53+
* @return integer 0 if everything went fine, or an error code
54+
*
55+
* @throws \InvalidArgumentException When the number of messages to consume is less than 0
56+
* @throws \BadFunctionCallException When the pcntl is not installed and option -s is true
57+
*/
58+
protected function execute(InputInterface $input, OutputInterface $output)
59+
{
60+
if (defined('AMQP_WITHOUT_SIGNALS') === false) {
61+
define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals'));
62+
}
63+
64+
if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) {
65+
if (!function_exists('pcntl_signal')) {
66+
throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called.");
67+
}
68+
69+
pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
70+
pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
71+
}
72+
73+
if (defined('AMQP_DEBUG') === false) {
74+
define('AMQP_DEBUG', (bool) $input->getOption('debug'));
75+
}
76+
77+
$this->initConsumer($input);
78+
79+
return $this->consumer->consume();
80+
}
81+
82+
/**
83+
* @param InputInterface $input
84+
*/
85+
protected function initConsumer(InputInterface $input)
86+
{
87+
$this->consumer = $this->getContainer()
88+
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));
89+
90+
if (null !== $input->getOption('memory-limit') &&
91+
ctype_digit((string) $input->getOption('memory-limit')) &&
92+
$input->getOption('memory-limit') > 0
93+
) {
94+
$this->consumer->setMemoryLimit($input->getOption('memory-limit'));
95+
}
96+
$this->consumer->setRoutingKey($input->getOption('route'));
97+
}
98+
99+
/**
100+
* @return string
101+
*/
102+
protected function getConsumerService()
103+
{
104+
return 'old_sound_rabbit_mq.%s_batch';
105+
}
106+
}

Command/DeleteCommand.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Symfony\Component\Console\Input\InputInterface;
77
use Symfony\Component\Console\Input\InputOption;
88
use Symfony\Component\Console\Output\OutputInterface;
9+
use Symfony\Component\Console\Question\ConfirmationQuestion;
910

1011
/**
1112
* Command to delete a queue
@@ -32,8 +33,15 @@ protected function execute(InputInterface $input, OutputInterface $output)
3233
$noConfirmation = (bool) $input->getOption('no-confirmation');
3334

3435
if (!$noConfirmation && $input->isInteractive()) {
35-
$confirmation = $this->getHelper('dialog')->askConfirmation($output, sprintf('<question>Are you sure you wish to delete "%s" consumer\'s queue?(y/n)</question>', $input->getArgument('name')), false);
36-
if (!$confirmation) {
36+
$question = new ConfirmationQuestion(
37+
sprintf(
38+
'<question>Are you sure you wish to delete "%s" consumer\'s queue? (y/n)</question>',
39+
$input->getArgument('name')
40+
),
41+
false
42+
);
43+
44+
if (!$this->getHelper('question')->ask($input, $output, $question)) {
3745
$output->writeln('<error>Deletion cancelled!</error>');
3846

3947
return 1;

Command/PurgeConsumerCommand.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Symfony\Component\Console\Input\InputInterface;
77
use Symfony\Component\Console\Input\InputOption;
88
use Symfony\Component\Console\Output\OutputInterface;
9+
use Symfony\Component\Console\Question\ConfirmationQuestion;
910

1011
/**
1112
* Command to purge a queue
@@ -32,8 +33,15 @@ protected function execute(InputInterface $input, OutputInterface $output)
3233
$noConfirmation = (bool) $input->getOption('no-confirmation');
3334

3435
if (!$noConfirmation && $input->isInteractive()) {
35-
$confirmation = $this->getHelper('dialog')->askConfirmation($output, sprintf('<question>Are you sure you wish to purge "%s" queue? (y/n)</question>', $input->getArgument('name')), false);
36-
if (!$confirmation) {
36+
$question = new ConfirmationQuestion(
37+
sprintf(
38+
'<question>Are you sure you wish to purge "%s" queue? (y/n)</question>',
39+
$input->getArgument('name')
40+
),
41+
false
42+
);
43+
44+
if (!$this->getHelper('question')->ask($input, $output, $question)) {
3745
$output->writeln('<error>Purging cancelled!</error>');
3846

3947
return 1;

Command/SetupFabricCommand.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle\Command;
44

5+
use OldSound\RabbitMqBundle\RabbitMq\DynamicConsumer;
56
use Symfony\Component\Console\Input\InputInterface;
67
use Symfony\Component\Console\Input\InputOption;
78
use Symfony\Component\Console\Output\OutputInterface;
@@ -29,6 +30,9 @@ protected function execute(InputInterface $input, OutputInterface $output)
2930

3031
foreach (array('base_amqp', 'binding') as $key) {
3132
foreach ($partsHolder->getParts('old_sound_rabbit_mq.' . $key) as $baseAmqp) {
33+
if ($baseAmqp instanceof DynamicConsumer) {
34+
continue;
35+
}
3236
$baseAmqp->setupFabric();
3337
}
3438
}

DependencyInjection/Configuration.php

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,26 @@
1313
*/
1414
class Configuration implements ConfigurationInterface
1515
{
16+
/**
17+
* @var string
18+
*/
19+
protected $name;
20+
21+
/**
22+
* Configuration constructor.
23+
*
24+
* @param string $name
25+
*/
26+
public function __construct($name)
27+
{
28+
$this->name = $name;
29+
}
30+
1631
public function getConfigTreeBuilder()
1732
{
1833
$tree = new TreeBuilder();
1934

20-
$rootNode = $tree->root('old_sound_rabbit_mq');
35+
$rootNode = $tree->root($this->name);
2136

2237
$rootNode
2338
->children()
@@ -33,6 +48,7 @@ public function getConfigTreeBuilder()
3348
$this->addConsumers($rootNode);
3449
$this->addMultipleConsumers($rootNode);
3550
$this->addDynamicConsumers($rootNode);
51+
$this->addBatchConsumers($rootNode);
3652
$this->addAnonConsumers($rootNode);
3753
$this->addRpcClients($rootNode);
3854
$this->addRpcServers($rootNode);
@@ -67,6 +83,7 @@ protected function addConnections(ArrayNodeDefinition $node)
6783
->end()
6884
->booleanNode('keepalive')->defaultFalse()->info('requires php-amqplib v2.4.1+ and PHP5.4+')->end()
6985
->scalarNode('heartbeat')->defaultValue(0)->info('requires php-amqplib v2.4.1+')->end()
86+
->scalarNode('connection_parameters_provider')->end()
7087
->end()
7188
->end()
7289
->end()
@@ -138,6 +155,13 @@ protected function addConsumers(ArrayNodeDefinition $node)
138155
->scalarNode('callback')->isRequired()->end()
139156
->scalarNode('idle_timeout')->end()
140157
->scalarNode('idle_timeout_exit_code')->end()
158+
->arrayNode('graceful_max_execution')
159+
->canBeUnset()
160+
->children()
161+
->integerNode('timeout')->end()
162+
->integerNode('exit_code')->defaultValue(0)->end()
163+
->end()
164+
->end()
141165
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
142166
->arrayNode('qos_options')
143167
->canBeUnset()
@@ -220,6 +244,43 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
220244
;
221245
}
222246

247+
/**
248+
* @param ArrayNodeDefinition $node
249+
*
250+
* @return void
251+
*/
252+
protected function addBatchConsumers(ArrayNodeDefinition $node)
253+
{
254+
$node
255+
->children()
256+
->arrayNode('batch_consumers')
257+
->canBeUnset()
258+
->useAttributeAsKey('key')
259+
->prototype('array')
260+
->append($this->getExchangeConfiguration())
261+
->append($this->getQueueConfiguration())
262+
->children()
263+
->scalarNode('connection')->defaultValue('default')->end()
264+
->scalarNode('callback')->isRequired()->end()
265+
->scalarNode('idle_timeout')->end()
266+
->scalarNode('timeout_wait')->defaultValue(3)->end()
267+
->scalarNode('idle_timeout_exit_code')->end()
268+
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
269+
->arrayNode('qos_options')
270+
->children()
271+
->scalarNode('prefetch_size')->defaultValue(0)->end()
272+
->scalarNode('prefetch_count')->defaultValue(2)->end()
273+
->booleanNode('global')->defaultFalse()->end()
274+
->end()
275+
->end()
276+
->scalarNode('enable_logger')->defaultFalse()->end()
277+
->end()
278+
->end()
279+
->end()
280+
->end()
281+
;
282+
}
283+
223284
protected function addAnonConsumers(ArrayNodeDefinition $node)
224285
{
225286
$node
@@ -341,6 +402,7 @@ protected function getMultipleQueuesConfiguration()
341402
protected function addQueueNodeConfiguration(ArrayNodeDefinition $node)
342403
{
343404
$node
405+
->fixXmlConfig('routing_key')
344406
->children()
345407
->scalarNode('name')->end()
346408
->booleanNode('passive')->defaultFalse()->end()

0 commit comments

Comments
 (0)