Skip to content

Add idle_timeout_exit_code option to allow specify an exit code in ca… #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Command/BaseConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
$this->initConsumer($input);

$this->consumer->consume($this->amount);
return $this->consumer->consume($this->amount);
}

protected function initConsumer($input) {
Expand Down
3 changes: 3 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('qos_options')
->canBeUnset()
Expand Down Expand Up @@ -168,6 +169,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->children()
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->append($this->getMultipleQueuesConfiguration())
->arrayNode('qos_options')
Expand Down Expand Up @@ -200,6 +202,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
->scalarNode('connection')->defaultValue('default')->end()
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('qos_options')
->canBeUnset()
Expand Down
9 changes: 9 additions & 0 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ protected function loadConsumers()
if (isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
Expand Down Expand Up @@ -260,6 +263,9 @@ protected function loadMultipleConsumers()
if (isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
Expand Down Expand Up @@ -319,6 +325,9 @@ protected function loadDynamicConsumers()
if (isset($consumer['idle_timeout'])) {
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
}
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,16 +351,18 @@ If the process message will throw an Exception the event will not raise.

#### Idle timeout ####

If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds:
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds.
The `idle_timeout_exit_code` specifies what exit code should be returned by the consumer when the idle timeout occurs. Without specifying it, the consumer will throw an **PhpAmqpLib\Exception\AMQPTimeoutException** exception.

```yaml
consumers:
upload_picture:
connection: default
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service
idle_timeout: 60
connection: default
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service
idle_timeout: 60
idle_timeout_exit_code: 0
```

#### Fair dispatching ####
Expand Down
22 changes: 22 additions & 0 deletions RabbitMq/BaseConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface

protected $idleTimeout = 0;

protected $idleTimeoutExitCode;

public function setCallback($callback)
{
$this->callback = $callback;
Expand Down Expand Up @@ -100,11 +102,31 @@ public function setIdleTimeout($idleTimeout)
$this->idleTimeout = $idleTimeout;
}

/**
* Set exit code to be returned when there is a timeout exception
*
* @param int|null $idleTimeoutExitCode
*/
public function setIdleTimeoutExitCode($idleTimeoutExitCode)
{
$this->idleTimeoutExitCode = $idleTimeoutExitCode;
}

public function getIdleTimeout()
{
return $this->idleTimeout;
}

/**
* Get exit code to be returned when there is a timeout exception
*
* @return int|null
*/
public function getIdleTimeoutExitCode()
{
return $this->idleTimeoutExitCode;
}

/**
* Resets the consumed property.
* Use when you want to call start() or consume() multiple times.
Expand Down
11 changes: 10 additions & 1 deletion RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;

class Consumer extends BaseConsumer
Expand Down Expand Up @@ -49,7 +50,15 @@ public function consume($msgAmount)
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
$this->maybeStopConsumer();
if (!$this->forceStop) {
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
try {
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
} catch (AMQPTimeoutException $e) {
if (null !== $this->getIdleTimeoutExitCode()) {
return $this->getIdleTimeoutExitCode();
} else {
throw $e;
}
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions Tests/RabbitMq/BaseConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public function testItsIdleTimeoutIsMutable()
$this->assertEquals(42, $this->consumer->getIdleTimeout());
}

public function testItsIdleTimeoutExitCodeIsMutable()
{
$this->assertEquals(0, $this->consumer->getIdleTimeoutExitCode());
$this->consumer->setIdleTimeoutExitCode(43);
$this->assertEquals(43, $this->consumer->getIdleTimeoutExitCode());
}

public function testForceStopConsumer()
{
$this->assertAttributeEquals(false, 'forceStop', $this->consumer);
Expand Down
32 changes: 32 additions & 0 deletions Tests/RabbitMq/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

Expand Down Expand Up @@ -170,4 +171,35 @@ function () use ($amqpChannel) {
$consumer->setEventDispatcher($eventDispatcher);
$consumer->consume(1);
}

public function testIdleTimeoutExitCode()
{
// set up amqp connection
$amqpConnection = $this->prepareAMQPConnection();
// set up amqp channel
$amqpChannel = $this->prepareAMQPChannel();
$amqpChannel->expects($this->atLeastOnce())
->method('getChannelId')
->with()
->willReturn(true);
$amqpChannel->expects($this->once())
->method('basic_consume')
->withAnyParameters()
->willReturn(true);

// set up consumer
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
// disable autosetup fabric so we do not mock more objects
$consumer->disableAutoSetupFabric();
$consumer->setChannel($amqpChannel);
$consumer->setIdleTimeoutExitCode(2);
$amqpChannel->callbacks = array('idle_timeout_exit_code');

$amqpChannel->expects($this->exactly(1))
->method('wait')
->with(null, false, $consumer->getIdleTimeout())
->willThrowException(new AMQPTimeoutException());

$this->assertTrue(2 == $consumer->consume(1));
}
}