Skip to content

Commit 58617ac

Browse files
authored
Merge branch 'master' into batch-consumer
2 parents db49a24 + 42e7c46 commit 58617ac

27 files changed

+823
-31
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2017-01-22
2+
* Add `graceful_max_execution_timeout`
3+
14
- 2015-02-07
25
* Added possibility to set serialize/unserialize function for rpc servers/clients
36

Command/DeleteCommand.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Symfony\Component\Console\Input\InputInterface;
77
use Symfony\Component\Console\Input\InputOption;
88
use Symfony\Component\Console\Output\OutputInterface;
9+
use Symfony\Component\Console\Question\ConfirmationQuestion;
910

1011
/**
1112
* Command to delete a queue
@@ -32,8 +33,15 @@ protected function execute(InputInterface $input, OutputInterface $output)
3233
$noConfirmation = (bool) $input->getOption('no-confirmation');
3334

3435
if (!$noConfirmation && $input->isInteractive()) {
35-
$confirmation = $this->getHelper('dialog')->askConfirmation($output, sprintf('<question>Are you sure you wish to delete "%s" consumer\'s queue?(y/n)</question>', $input->getArgument('name')), false);
36-
if (!$confirmation) {
36+
$question = new ConfirmationQuestion(
37+
sprintf(
38+
'<question>Are you sure you wish to delete "%s" consumer\'s queue? (y/n)</question>',
39+
$input->getArgument('name')
40+
),
41+
false
42+
);
43+
44+
if (!$this->getHelper('question')->ask($input, $output, $question)) {
3745
$output->writeln('<error>Deletion cancelled!</error>');
3846

3947
return 1;

Command/MultipleConsumerCommand.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,28 @@
22

33
namespace OldSound\RabbitMqBundle\Command;
44

5+
use Symfony\Component\Console\Input\InputArgument;
6+
57
class MultipleConsumerCommand extends BaseConsumerCommand
68
{
79
protected function configure()
810
{
911
parent::configure();
1012

11-
$this->setDescription('Executes a consumer that uses multiple queues');
12-
$this->setName('rabbitmq:multiple-consumer');
13+
$this->setDescription('Executes a consumer that uses multiple queues')
14+
->setName('rabbitmq:multiple-consumer')
15+
->addArgument('context', InputArgument::OPTIONAL, 'Context the consumer runs in')
16+
;
1317
}
1418

1519
protected function getConsumerService()
1620
{
1721
return 'old_sound_rabbit_mq.%s_multiple';
1822
}
19-
}
23+
24+
protected function initConsumer($input)
25+
{
26+
parent::initConsumer($input);
27+
$this->consumer->setContext($input->getArgument('context'));
28+
}
29+
}

Command/PurgeConsumerCommand.php

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Symfony\Component\Console\Input\InputInterface;
77
use Symfony\Component\Console\Input\InputOption;
88
use Symfony\Component\Console\Output\OutputInterface;
9+
use Symfony\Component\Console\Question\ConfirmationQuestion;
910

1011
/**
1112
* Command to purge a queue
@@ -32,8 +33,15 @@ protected function execute(InputInterface $input, OutputInterface $output)
3233
$noConfirmation = (bool) $input->getOption('no-confirmation');
3334

3435
if (!$noConfirmation && $input->isInteractive()) {
35-
$confirmation = $this->getHelper('dialog')->askConfirmation($output, sprintf('<question>Are you sure you wish to purge "%s" queue? (y/n)</question>', $input->getArgument('name')), false);
36-
if (!$confirmation) {
36+
$question = new ConfirmationQuestion(
37+
sprintf(
38+
'<question>Are you sure you wish to purge "%s" queue? (y/n)</question>',
39+
$input->getArgument('name')
40+
),
41+
false
42+
);
43+
44+
if (!$this->getHelper('question')->ask($input, $output, $question)) {
3745
$output->writeln('<error>Purging cancelled!</error>');
3846

3947
return 1;

DependencyInjection/Configuration.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ protected function addConsumers(ArrayNodeDefinition $node)
154154
->scalarNode('callback')->isRequired()->end()
155155
->scalarNode('idle_timeout')->end()
156156
->scalarNode('idle_timeout_exit_code')->end()
157+
->arrayNode('graceful_max_execution')
158+
->canBeUnset()
159+
->children()
160+
->integerNode('timeout')->end()
161+
->integerNode('exit_code')->defaultValue(0)->end()
162+
->end()
163+
->end()
157164
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
158165
->arrayNode('qos_options')
159166
->canBeUnset()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,16 @@ protected function loadConsumers()
201201
if (isset($consumer['idle_timeout_exit_code'])) {
202202
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
203203
}
204+
if (isset($consumer['graceful_max_execution'])) {
205+
$definition->addMethodCall(
206+
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
207+
array($consumer['graceful_max_execution']['timeout'])
208+
);
209+
$definition->addMethodCall(
210+
'setGracefulMaxExecutionTimeoutExitCode',
211+
array($consumer['graceful_max_execution']['exit_code'])
212+
);
213+
}
204214
if (!$consumer['auto_setup_fabric']) {
205215
$definition->addMethodCall('disableAutoSetupFabric');
206216
}

Event/AMQPEvent.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
class AMQPEvent extends Event
1616
{
1717
const ON_CONSUME = 'on_consume';
18+
const ON_IDLE = 'on_idle';
1819
const BEFORE_PROCESSING_MESSAGE = 'before_processing';
1920
const AFTER_PROCESSING_MESSAGE = 'after_processing';
2021

Event/OnIdleEvent.php

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
7+
/**
8+
* Class OnIdleEvent
9+
*
10+
* @package OldSound\RabbitMqBundle\Command
11+
*/
12+
class OnIdleEvent extends AMQPEvent
13+
{
14+
const NAME = AMQPEvent::ON_IDLE;
15+
16+
/**
17+
* @var bool
18+
*/
19+
private $forceStop;
20+
21+
/**
22+
* OnConsumeEvent constructor.
23+
*
24+
* @param Consumer $consumer
25+
*/
26+
public function __construct(Consumer $consumer)
27+
{
28+
$this->setConsumer($consumer);
29+
30+
$this->forceStop = true;
31+
}
32+
33+
/**
34+
* @return boolean
35+
*/
36+
public function isForceStop()
37+
{
38+
return $this->forceStop;
39+
}
40+
41+
/**
42+
* @param boolean $forceStop
43+
*/
44+
public function setForceStop($forceStop)
45+
{
46+
$this->forceStop = $forceStop;
47+
}
48+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\MemoryChecker;
4+
5+
/**
6+
* Help handling memory limits .
7+
*
8+
* @author Jonas Haouzi <[email protected]>
9+
*/
10+
class MemoryConsumptionChecker
11+
{
12+
/** @var NativeMemoryUsageProvider */
13+
private $memoryUsageProvider;
14+
15+
/**
16+
* MemoryManager constructor.
17+
*
18+
* @param NativeMemoryUsageProvider $memoryUsageProvider
19+
*/
20+
public function __construct(NativeMemoryUsageProvider $memoryUsageProvider) {
21+
$this->memoryUsageProvider = $memoryUsageProvider;
22+
}
23+
24+
/**
25+
* @param int|string $allowedConsumptionUntil
26+
* @param int|string $maxConsumptionAllowed
27+
*
28+
* @return bool
29+
*/
30+
public function isRamAlmostOverloaded($allowedConsumptionUntil, $maxConsumptionAllowed)
31+
{
32+
$allowedConsumptionUntil = $this->convertHumanUnitToNumerical($allowedConsumptionUntil);
33+
$maxConsumptionAllowed = $this->convertHumanUnitToNumerical($maxConsumptionAllowed);
34+
$currentUsage = $this->convertHumanUnitToNumerical($this->memoryUsageProvider->getMemoryUsage());
35+
36+
return $currentUsage > ($maxConsumptionAllowed - $allowedConsumptionUntil);
37+
}
38+
39+
/**
40+
* @param int|string $humanUnit
41+
*
42+
* @return int
43+
*/
44+
private function convertHumanUnitToNumerical($humanUnit)
45+
{
46+
$numerical = $humanUnit;
47+
if (!is_numeric($humanUnit)) {
48+
$numerical = substr($numerical, 0, -1);
49+
switch (substr($humanUnit, -1)) {
50+
case 'G':
51+
$numerical *= pow(1024, 3);
52+
break;
53+
case 'M':
54+
$numerical *= pow(1024, 2);
55+
break;
56+
case 'K':
57+
$numerical *= 1024;
58+
break;
59+
}
60+
}
61+
62+
return (int)$numerical;
63+
}
64+
65+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\MemoryChecker;
4+
5+
/**
6+
* Returns the current memory PHP is using (mainly used to allow mocking).
7+
*
8+
* @author Jonas Haouzi <[email protected]>
9+
*/
10+
class NativeMemoryUsageProvider
11+
{
12+
/**
13+
* @return int
14+
*/
15+
public function getMemoryUsage()
16+
{
17+
return memory_get_usage(true);
18+
}
19+
}

Provider/QueuesProviderInterface.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,22 @@ interface QueuesProviderInterface
1212
/**
1313
* Return array of queues
1414
*
15+
* Example:
16+
* array(
17+
* 'queue_name' => array(
18+
* 'durable' => false,
19+
* 'exclusive' => false,
20+
* 'passive' => false,
21+
* 'nowait' => false,
22+
* 'auto_delete' => false,
23+
* 'routing_keys' => array('key.1', 'key.2'),
24+
* 'arguments' => array(),
25+
* 'ticket' => '',
26+
* 'callback' => array($callback, 'execute')
27+
* )
28+
* );
1529
* @return array
30+
*
1631
*/
1732
public function getQueues();
1833
}

README.md

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ From version 1.6, you can use the Dependency Injection component to load this bu
5757

5858
Require the bundle in your composer.json file:
5959

60-
````
60+
```
6161
{
6262
"require": {
6363
"php-amqplib/rabbitmq-bundle": "~1.6",
@@ -351,6 +351,32 @@ class AfterProcessingMessageEvent extends AMQPEvent
351351
Event raised after processing a AMQPMessage.
352352
If the process message will throw an Exception the event will not raise.
353353

354+
##### IDLE MESSAGE #####
355+
356+
```php
357+
<?php
358+
class OnIdleEvent extends AMQPEvent
359+
{
360+
const NAME = AMQPEvent::ON_IDLE;
361+
362+
/**
363+
* OnIdleEvent constructor.
364+
*
365+
* @param AMQPMessage $AMQPMessage
366+
*/
367+
public function __construct(Consumer $consumer)
368+
{
369+
$this->setConsumer($consumer);
370+
371+
$this->forceStop = true;
372+
}
373+
}
374+
```
375+
376+
Event raised when `wait` method exit by timeout without receiving a message.
377+
In order to make use of this event a consumer `idle_timeout` has to be [configured](#idle-timeout).
378+
By default process exit on idle timeout, you can prevent it by setting `$event->setForceStop(false)` in a listener.
379+
354380
#### Idle timeout ####
355381

356382
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds.
@@ -367,6 +393,27 @@ consumers:
367393
idle_timeout_exit_code: 0
368394
```
369395

396+
#### Graceful max execution timeout ####
397+
398+
If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.
399+
"Gracefully exit" means, that the consumer will exit either after the currently running task or immediatelly, when waiting for new tasks.
400+
The `graceful_max_execution.exit_code` specifies what exit code should be returned by the consumer when the graceful max execution timeout occurs. Without specifying it, the consumer will exit with status `0`.
401+
402+
This feature is great in conjuction with supervisord, which together can allow for periodical memory leaks cleanup, connection with database/rabbitmq renewal and more.
403+
404+
```yaml
405+
consumers:
406+
upload_picture:
407+
connection: default
408+
exchange_options: {name: 'upload-picture', type: direct}
409+
queue_options: {name: 'upload-picture'}
410+
callback: upload_picture_service
411+
412+
graceful_max_execution:
413+
timeout: 1800 # 30 minutes
414+
exit_code: 10 # default is 0
415+
```
416+
370417
#### Fair dispatching ####
371418

372419
> You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

RabbitMq/BaseAmqp.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $c
6464
$this->conn = $conn;
6565
$this->ch = $ch;
6666

67-
if (!($conn instanceof AMQPLazyConnection)) {
67+
if ($conn->connectOnConstruct()) {
6868
$this->getChannel();
6969
}
7070

0 commit comments

Comments
 (0)