Skip to content

Commit 2614780

Browse files
authored
Merge pull request #405 from danut007ro/idle_timeout_exit_code
Add idle_timeout_exit_code option to allow specify an exit code in ca…
2 parents 7fdd7c5 + 22d3a41 commit 2614780

File tree

8 files changed

+92
-8
lines changed

8 files changed

+92
-8
lines changed

Command/BaseConsumerCommand.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
8787
}
8888
$this->initConsumer($input);
8989

90-
$this->consumer->consume($this->amount);
90+
return $this->consumer->consume($this->amount);
9191
}
9292

9393
protected function initConsumer($input) {

DependencyInjection/Configuration.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ protected function addConsumers(ArrayNodeDefinition $node)
138138
->scalarNode('connection')->defaultValue('default')->end()
139139
->scalarNode('callback')->isRequired()->end()
140140
->scalarNode('idle_timeout')->end()
141+
->scalarNode('idle_timeout_exit_code')->end()
141142
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
142143
->arrayNode('qos_options')
143144
->canBeUnset()
@@ -168,6 +169,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
168169
->children()
169170
->scalarNode('connection')->defaultValue('default')->end()
170171
->scalarNode('idle_timeout')->end()
172+
->scalarNode('idle_timeout_exit_code')->end()
171173
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
172174
->append($this->getMultipleQueuesConfiguration())
173175
->arrayNode('qos_options')
@@ -200,6 +202,7 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node)
200202
->scalarNode('connection')->defaultValue('default')->end()
201203
->scalarNode('callback')->isRequired()->end()
202204
->scalarNode('idle_timeout')->end()
205+
->scalarNode('idle_timeout_exit_code')->end()
203206
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
204207
->arrayNode('qos_options')
205208
->canBeUnset()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@ protected function loadConsumers()
197197
if (isset($consumer['idle_timeout'])) {
198198
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
199199
}
200+
if (isset($consumer['idle_timeout_exit_code'])) {
201+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
202+
}
200203
if (!$consumer['auto_setup_fabric']) {
201204
$definition->addMethodCall('disableAutoSetupFabric');
202205
}
@@ -260,6 +263,9 @@ protected function loadMultipleConsumers()
260263
if (isset($consumer['idle_timeout'])) {
261264
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
262265
}
266+
if (isset($consumer['idle_timeout_exit_code'])) {
267+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
268+
}
263269
if (!$consumer['auto_setup_fabric']) {
264270
$definition->addMethodCall('disableAutoSetupFabric');
265271
}
@@ -319,6 +325,9 @@ protected function loadDynamicConsumers()
319325
if (isset($consumer['idle_timeout'])) {
320326
$definition->addMethodCall('setIdleTimeout', array($consumer['idle_timeout']));
321327
}
328+
if (isset($consumer['idle_timeout_exit_code'])) {
329+
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
330+
}
322331
if (!$consumer['auto_setup_fabric']) {
323332
$definition->addMethodCall('disableAutoSetupFabric');
324333
}

README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,16 +351,18 @@ If the process message will throw an Exception the event will not raise.
351351
352352
#### Idle timeout ####
353353
354-
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds:
354+
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds.
355+
The `idle_timeout_exit_code` specifies what exit code should be returned by the consumer when the idle timeout occurs. Without specifying it, the consumer will throw an **PhpAmqpLib\Exception\AMQPTimeoutException** exception.
355356
356357
```yaml
357358
consumers:
358359
upload_picture:
359-
connection: default
360-
exchange_options: {name: 'upload-picture', type: direct}
361-
queue_options: {name: 'upload-picture'}
362-
callback: upload_picture_service
363-
idle_timeout: 60
360+
connection: default
361+
exchange_options: {name: 'upload-picture', type: direct}
362+
queue_options: {name: 'upload-picture'}
363+
callback: upload_picture_service
364+
idle_timeout: 60
365+
idle_timeout_exit_code: 0
364366
```
365367
366368
#### Fair dispatching ####

RabbitMq/BaseConsumer.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ abstract class BaseConsumer extends BaseAmqp implements DequeuerInterface
1616

1717
protected $idleTimeout = 0;
1818

19+
protected $idleTimeoutExitCode;
20+
1921
public function setCallback($callback)
2022
{
2123
$this->callback = $callback;
@@ -100,11 +102,31 @@ public function setIdleTimeout($idleTimeout)
100102
$this->idleTimeout = $idleTimeout;
101103
}
102104

105+
/**
106+
* Set exit code to be returned when there is a timeout exception
107+
*
108+
* @param int|null $idleTimeoutExitCode
109+
*/
110+
public function setIdleTimeoutExitCode($idleTimeoutExitCode)
111+
{
112+
$this->idleTimeoutExitCode = $idleTimeoutExitCode;
113+
}
114+
103115
public function getIdleTimeout()
104116
{
105117
return $this->idleTimeout;
106118
}
107119

120+
/**
121+
* Get exit code to be returned when there is a timeout exception
122+
*
123+
* @return int|null
124+
*/
125+
public function getIdleTimeoutExitCode()
126+
{
127+
return $this->idleTimeoutExitCode;
128+
}
129+
108130
/**
109131
* Resets the consumed property.
110132
* Use when you want to call start() or consume() multiple times.

RabbitMq/Consumer.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
66
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
77
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8+
use PhpAmqpLib\Exception\AMQPTimeoutException;
89
use PhpAmqpLib\Message\AMQPMessage;
910

1011
class Consumer extends BaseConsumer
@@ -49,7 +50,15 @@ public function consume($msgAmount)
4950
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
5051
$this->maybeStopConsumer();
5152
if (!$this->forceStop) {
52-
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
53+
try {
54+
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
55+
} catch (AMQPTimeoutException $e) {
56+
if (null !== $this->getIdleTimeoutExitCode()) {
57+
return $this->getIdleTimeoutExitCode();
58+
} else {
59+
throw $e;
60+
}
61+
}
5362
}
5463
}
5564
}

Tests/RabbitMq/BaseConsumerTest.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ public function testItsIdleTimeoutIsMutable()
3737
$this->assertEquals(42, $this->consumer->getIdleTimeout());
3838
}
3939

40+
public function testItsIdleTimeoutExitCodeIsMutable()
41+
{
42+
$this->assertEquals(0, $this->consumer->getIdleTimeoutExitCode());
43+
$this->consumer->setIdleTimeoutExitCode(43);
44+
$this->assertEquals(43, $this->consumer->getIdleTimeoutExitCode());
45+
}
46+
4047
public function testForceStopConsumer()
4148
{
4249
$this->assertAttributeEquals(false, 'forceStop', $this->consumer);

Tests/RabbitMq/ConsumerTest.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
77
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
88
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
9+
use PhpAmqpLib\Exception\AMQPTimeoutException;
910
use PhpAmqpLib\Message\AMQPMessage;
1011
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
1112

@@ -170,4 +171,35 @@ function () use ($amqpChannel) {
170171
$consumer->setEventDispatcher($eventDispatcher);
171172
$consumer->consume(1);
172173
}
174+
175+
public function testIdleTimeoutExitCode()
176+
{
177+
// set up amqp connection
178+
$amqpConnection = $this->prepareAMQPConnection();
179+
// set up amqp channel
180+
$amqpChannel = $this->prepareAMQPChannel();
181+
$amqpChannel->expects($this->atLeastOnce())
182+
->method('getChannelId')
183+
->with()
184+
->willReturn(true);
185+
$amqpChannel->expects($this->once())
186+
->method('basic_consume')
187+
->withAnyParameters()
188+
->willReturn(true);
189+
190+
// set up consumer
191+
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
192+
// disable autosetup fabric so we do not mock more objects
193+
$consumer->disableAutoSetupFabric();
194+
$consumer->setChannel($amqpChannel);
195+
$consumer->setIdleTimeoutExitCode(2);
196+
$amqpChannel->callbacks = array('idle_timeout_exit_code');
197+
198+
$amqpChannel->expects($this->exactly(1))
199+
->method('wait')
200+
->with(null, false, $consumer->getIdleTimeout())
201+
->willThrowException(new AMQPTimeoutException());
202+
203+
$this->assertTrue(2 == $consumer->consume(1));
204+
}
173205
}

0 commit comments

Comments
 (0)