Skip to content

Commit 42e7c46

Browse files
authored
Merge pull request #425 from d-ph/feature/graceful-max-execution-timeout
Add graceful max execution timeout.
2 parents e6a8cff + 5bc2b63 commit 42e7c46

File tree

6 files changed

+226
-1
lines changed

6 files changed

+226
-1
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
- 2017-01-22
2+
* Add `graceful_max_execution_timeout`
3+
14
- 2015-02-07
25
* Added possibility to set serialize/unserialize function for rpc servers/clients
36

DependencyInjection/Configuration.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ protected function addConsumers(ArrayNodeDefinition $node)
138138
->scalarNode('callback')->isRequired()->end()
139139
->scalarNode('idle_timeout')->end()
140140
->scalarNode('idle_timeout_exit_code')->end()
141+
->arrayNode('graceful_max_execution')
142+
->canBeUnset()
143+
->children()
144+
->integerNode('timeout')->end()
145+
->integerNode('exit_code')->defaultValue(0)->end()
146+
->end()
147+
->end()
141148
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
142149
->arrayNode('qos_options')
143150
->canBeUnset()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,16 @@ protected function loadConsumers()
200200
if (isset($consumer['idle_timeout_exit_code'])) {
201201
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
202202
}
203+
if (isset($consumer['graceful_max_execution'])) {
204+
$definition->addMethodCall(
205+
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
206+
array($consumer['graceful_max_execution']['timeout'])
207+
);
208+
$definition->addMethodCall(
209+
'setGracefulMaxExecutionTimeoutExitCode',
210+
array($consumer['graceful_max_execution']['exit_code'])
211+
);
212+
}
203213
if (!$consumer['auto_setup_fabric']) {
204214
$definition->addMethodCall('disableAutoSetupFabric');
205215
}

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,27 @@ consumers:
393393
idle_timeout_exit_code: 0
394394
```
395395

396+
#### Graceful max execution timeout ####
397+
398+
If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.
399+
"Gracefully exit" means, that the consumer will exit either after the currently running task or immediatelly, when waiting for new tasks.
400+
The `graceful_max_execution.exit_code` specifies what exit code should be returned by the consumer when the graceful max execution timeout occurs. Without specifying it, the consumer will exit with status `0`.
401+
402+
This feature is great in conjuction with supervisord, which together can allow for periodical memory leaks cleanup, connection with database/rabbitmq renewal and more.
403+
404+
```yaml
405+
consumers:
406+
upload_picture:
407+
connection: default
408+
exchange_options: {name: 'upload-picture', type: direct}
409+
queue_options: {name: 'upload-picture'}
410+
callback: upload_picture_service
411+
412+
graceful_max_execution:
413+
timeout: 1800 # 30 minutes
414+
exit_code: 10 # default is 0
415+
```
416+
396417
#### Fair dispatching ####
397418

398419
> You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

RabbitMq/Consumer.php

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,25 @@
1313

1414
class Consumer extends BaseConsumer
1515
{
16+
const TIMEOUT_TYPE_IDLE = 'idle';
17+
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution';
18+
1619
/**
1720
* @var int $memoryLimit
1821
*/
1922
protected $memoryLimit = null;
2023

24+
/**
25+
* @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
26+
* any currently running consumption will not be interrupted.
27+
*/
28+
protected $gracefulMaxExecutionDateTime;
29+
30+
/**
31+
* @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
32+
*/
33+
protected $gracefulMaxExecutionTimeoutExitCode = 0;
34+
2135
/**
2236
* Set the memory limit
2337
*
@@ -42,6 +56,7 @@ public function getMemoryLimit()
4256
* Consume the message
4357
*
4458
* @param int $msgAmount
59+
* @return int
4560
*/
4661
public function consume($msgAmount)
4762
{
@@ -52,10 +67,27 @@ public function consume($msgAmount)
5267
while (count($this->getChannel()->callbacks)) {
5368
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
5469
$this->maybeStopConsumer();
70+
71+
/*
72+
* Be careful not to trigger ::wait() with 0 or less seconds, when
73+
* graceful max execution timeout is being used.
74+
*/
75+
$waitTimeout = $this->chooseWaitTimeout();
76+
if (
77+
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION
78+
&& $waitTimeout['seconds'] < 1
79+
) {
80+
return $this->gracefulMaxExecutionTimeoutExitCode;
81+
}
82+
5583
if (!$this->forceStop) {
5684
try {
57-
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
85+
$this->getChannel()->wait(null, false, $waitTimeout['seconds']);
5886
} catch (AMQPTimeoutException $e) {
87+
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) {
88+
return $this->gracefulMaxExecutionTimeoutExitCode;
89+
}
90+
5991
$idleEvent = new OnIdleEvent($this);
6092
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
6193

@@ -69,6 +101,8 @@ public function consume($msgAmount)
69101
}
70102
}
71103
}
104+
105+
return 0;
72106
}
73107

74108
/**
@@ -177,4 +211,92 @@ protected function isRamAlmostOverloaded()
177211

178212
return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit(), '5M');
179213
}
214+
215+
/**
216+
* @param \DateTime|null $dateTime
217+
*/
218+
public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
219+
{
220+
$this->gracefulMaxExecutionDateTime = $dateTime;
221+
}
222+
223+
/**
224+
* @param int $secondsInTheFuture
225+
*/
226+
public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
227+
{
228+
$this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
229+
}
230+
231+
/**
232+
* @param int $exitCode
233+
*/
234+
public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
235+
{
236+
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
237+
}
238+
239+
/**
240+
* @return \DateTime|null
241+
*/
242+
public function getGracefulMaxExecutionDateTime()
243+
{
244+
return $this->gracefulMaxExecutionDateTime;
245+
}
246+
247+
/**
248+
* @return int
249+
*/
250+
public function getGracefulMaxExecutionTimeoutExitCode()
251+
{
252+
return $this->gracefulMaxExecutionTimeoutExitCode;
253+
}
254+
255+
/**
256+
* Choose the timeout to use for the $this->getChannel()->wait() method.
257+
*
258+
* @return array Of structure
259+
* {
260+
* timeoutType: string; // one of self::TIMEOUT_TYPE_*
261+
* seconds: int;
262+
* }
263+
*/
264+
private function chooseWaitTimeout()
265+
{
266+
if ($this->gracefulMaxExecutionDateTime) {
267+
$allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
268+
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
269+
+ $allowedExecutionDateInterval->h * 3600
270+
+ $allowedExecutionDateInterval->i * 60
271+
+ $allowedExecutionDateInterval->s;
272+
273+
if (!$allowedExecutionDateInterval->invert) {
274+
$allowedExecutionSeconds *= -1;
275+
}
276+
277+
/*
278+
* Respect the idle timeout if it's set and if it's less than
279+
* the remaining allowed execution.
280+
*/
281+
if (
282+
$this->getIdleTimeout()
283+
&& $this->getIdleTimeout() < $allowedExecutionSeconds
284+
) {
285+
return array(
286+
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
287+
'seconds' => $this->getIdleTimeout(),
288+
);
289+
}
290+
291+
return array(
292+
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION,
293+
'seconds' => $allowedExecutionSeconds,
294+
);
295+
}
296+
297+
return array(
298+
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
299+
'seconds' => $this->getIdleTimeout(),
300+
);
301+
}
180302
}

Tests/RabbitMq/ConsumerTest.php

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,66 @@ public function testShouldAllowContinueConsumptionAfterIdleTimeout()
255255
$this->setExpectedException('PhpAmqpLib\Exception\AMQPTimeoutException');
256256
$consumer->consume(10);
257257
}
258+
259+
public function testGracefulMaxExecutionTimeoutExitCode()
260+
{
261+
// set up amqp connection
262+
$amqpConnection = $this->prepareAMQPConnection();
263+
// set up amqp channel
264+
$amqpChannel = $this->prepareAMQPChannel();
265+
$amqpChannel->expects($this->atLeastOnce())
266+
->method('getChannelId')
267+
->with()
268+
->willReturn(true);
269+
$amqpChannel->expects($this->once())
270+
->method('basic_consume')
271+
->withAnyParameters()
272+
->willReturn(true);
273+
274+
// set up consumer
275+
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
276+
// disable autosetup fabric so we do not mock more objects
277+
$consumer->disableAutoSetupFabric();
278+
$consumer->setChannel($amqpChannel);
279+
280+
$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
281+
$consumer->setGracefulMaxExecutionTimeoutExitCode(10);
282+
$amqpChannel->callbacks = array('graceful_max_execution_timeout_test');
283+
284+
$amqpChannel->expects($this->exactly(1))
285+
->method('wait')
286+
->willThrowException(new AMQPTimeoutException());
287+
288+
$this->assertSame(10, $consumer->consume(1));
289+
}
290+
291+
public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
292+
{
293+
// set up amqp connection
294+
$amqpConnection = $this->prepareAMQPConnection();
295+
// set up amqp channel
296+
$amqpChannel = $this->prepareAMQPChannel();
297+
$amqpChannel->expects($this->atLeastOnce())
298+
->method('getChannelId')
299+
->with()
300+
->willReturn(true);
301+
$amqpChannel->expects($this->once())
302+
->method('basic_consume')
303+
->withAnyParameters()
304+
->willReturn(true);
305+
306+
// set up consumer
307+
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
308+
// disable autosetup fabric so we do not mock more objects
309+
$consumer->disableAutoSetupFabric();
310+
$consumer->setChannel($amqpChannel);
311+
312+
$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(0);
313+
$amqpChannel->callbacks = array('graceful_max_execution_timeout_test');
314+
315+
$amqpChannel->expects($this->never())
316+
->method('wait');
317+
318+
$consumer->consume(1);
319+
}
258320
}

0 commit comments

Comments
 (0)