Skip to content

Add graceful max execution timeout. #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- 2017-01-22
* Add `graceful_max_execution_timeout`

- 2015-02-07
* Added possibility to set serialize/unserialize function for rpc servers/clients

Expand Down
7 changes: 7 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ protected function addConsumers(ArrayNodeDefinition $node)
->scalarNode('callback')->isRequired()->end()
->scalarNode('idle_timeout')->end()
->scalarNode('idle_timeout_exit_code')->end()
->arrayNode('graceful_max_execution')
->canBeUnset()
->children()
->integerNode('timeout')->end()
->integerNode('exit_code')->defaultValue(0)->end()
->end()
->end()
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
->arrayNode('qos_options')
->canBeUnset()
Expand Down
10 changes: 10 additions & 0 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ protected function loadConsumers()
if (isset($consumer['idle_timeout_exit_code'])) {
$definition->addMethodCall('setIdleTimeoutExitCode', array($consumer['idle_timeout_exit_code']));
}
if (isset($consumer['graceful_max_execution'])) {
$definition->addMethodCall(
'setGracefulMaxExecutionDateTimeFromSecondsInTheFuture',
array($consumer['graceful_max_execution']['timeout'])
);
$definition->addMethodCall(
'setGracefulMaxExecutionTimeoutExitCode',
array($consumer['graceful_max_execution']['exit_code'])
);
}
if (!$consumer['auto_setup_fabric']) {
$definition->addMethodCall('disableAutoSetupFabric');
}
Expand Down
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,27 @@ consumers:
idle_timeout_exit_code: 0
```

#### Graceful max execution timeout ####

If you'd like your consumer to be running up to certain time and then gracefully exit, then set the `graceful_max_execution.timeout` in seconds.
"Gracefully exit" means, that the consumer will exit either after the currently running task or immediatelly, when waiting for new tasks.
The `graceful_max_execution.exit_code` specifies what exit code should be returned by the consumer when the graceful max execution timeout occurs. Without specifying it, the consumer will exit with status `0`.

This feature is great in conjuction with supervisord, which together can allow for periodical memory leaks cleanup, connection with database/rabbitmq renewal and more.

```yaml
consumers:
upload_picture:
connection: default
exchange_options: {name: 'upload-picture', type: direct}
queue_options: {name: 'upload-picture'}
callback: upload_picture_service

graceful_max_execution:
timeout: 1800 # 30 minutes
exit_code: 10 # default is 0
```

#### Fair dispatching ####

> You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.
Expand Down
124 changes: 123 additions & 1 deletion RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,25 @@

class Consumer extends BaseConsumer
{
const TIMEOUT_TYPE_IDLE = 'idle';
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution';

/**
* @var int $memoryLimit
*/
protected $memoryLimit = null;

/**
* @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
* any currently running consumption will not be interrupted.
*/
protected $gracefulMaxExecutionDateTime;

/**
* @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
*/
protected $gracefulMaxExecutionTimeoutExitCode = 0;

/**
* Set the memory limit
*
Expand All @@ -42,6 +56,7 @@ public function getMemoryLimit()
* Consume the message
*
* @param int $msgAmount
* @return int
*/
public function consume($msgAmount)
{
Expand All @@ -52,10 +67,27 @@ public function consume($msgAmount)
while (count($this->getChannel()->callbacks)) {
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
$this->maybeStopConsumer();

/*
* Be careful not to trigger ::wait() with 0 or less seconds, when
* graceful max execution timeout is being used.
*/
$waitTimeout = $this->chooseWaitTimeout();
if (
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION
&& $waitTimeout['seconds'] < 1
) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}

if (!$this->forceStop) {
try {
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
$this->getChannel()->wait(null, false, $waitTimeout['seconds']);
} catch (AMQPTimeoutException $e) {
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) {
return $this->gracefulMaxExecutionTimeoutExitCode;
}

$idleEvent = new OnIdleEvent($this);
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);

Expand All @@ -69,6 +101,8 @@ public function consume($msgAmount)
}
}
}

return 0;
}

/**
Expand Down Expand Up @@ -176,4 +210,92 @@ protected function isRamAlmostOverloaded()

return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit(), '5M');
}

/**
* @param \DateTime|null $dateTime
*/
public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
{
$this->gracefulMaxExecutionDateTime = $dateTime;
}

/**
* @param int $secondsInTheFuture
*/
public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
{
$this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add some validation to be sure you get seconds, maybe on configuration level?

Copy link
Contributor Author

@d-ph d-ph Jan 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean validation, that I get an integer? Or that I get seconds and not minutes or hours?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added validation, that the value is integer on configuration level. Please let me know if this is enough.

}

/**
* @param int $exitCode
*/
public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
{
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
}

/**
* @return \DateTime|null
*/
public function getGracefulMaxExecutionDateTime()
{
return $this->gracefulMaxExecutionDateTime;
}

/**
* @return int
*/
public function getGracefulMaxExecutionTimeoutExitCode()
{
return $this->gracefulMaxExecutionTimeoutExitCode;
}

/**
* Choose the timeout to use for the $this->getChannel()->wait() method.
*
* @return array Of structure
* {
* timeoutType: string; // one of self::TIMEOUT_TYPE_*
* seconds: int;
* }
*/
private function chooseWaitTimeout()
{
if ($this->gracefulMaxExecutionDateTime) {
$allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
+ $allowedExecutionDateInterval->h * 3600
+ $allowedExecutionDateInterval->i * 60
+ $allowedExecutionDateInterval->s;

if (!$allowedExecutionDateInterval->invert) {
$allowedExecutionSeconds *= -1;
}

/*
* Respect the idle timeout if it's set and if it's less than
* the remaining allowed execution.
*/
if (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if could be opposite & the duplicated return with self::TIMEOUT_TYPE_IDLE could be removed then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, but I disagree.

The code is not duplicated. This return statement is the same as the one a the end of the method, but the intention is different. The code after line 294 means: "do this, if Graceful Max Execution feature is not enabled". The code in the if statement starting on line 280 means: "Use Idle Timeout, if it's less than the computed remaining allowed execution". Currently, they are same in terms of code. But if tomorrow someone wanted to add some other crazy type of timeout, then they would modify the code after line 294. I bet they wouldn't think: "Ok, that other guy from Graceful whatever relied on the code after line 294 to return the Idle timeout". And then my code breaks.

Clear naming and no duplication are my prime principles. However I wouldn't trade "being proper" for "being pragmatic" here.

Please confirm, that you'd like me to remove the "duplication" despite what I said.

$this->getIdleTimeout()
&& $this->getIdleTimeout() < $allowedExecutionSeconds
) {
return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION,
'seconds' => $allowedExecutionSeconds,
);
}

return array(
'timeoutType' => self::TIMEOUT_TYPE_IDLE,
'seconds' => $this->getIdleTimeout(),
);
}
}
62 changes: 62 additions & 0 deletions Tests/RabbitMq/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,66 @@ public function testShouldAllowContinueConsumptionAfterIdleTimeout()
$this->setExpectedException('PhpAmqpLib\Exception\AMQPTimeoutException');
$consumer->consume(10);
}

public function testGracefulMaxExecutionTimeoutExitCode()
{
// set up amqp connection
$amqpConnection = $this->prepareAMQPConnection();
// set up amqp channel
$amqpChannel = $this->prepareAMQPChannel();
$amqpChannel->expects($this->atLeastOnce())
->method('getChannelId')
->with()
->willReturn(true);
$amqpChannel->expects($this->once())
->method('basic_consume')
->withAnyParameters()
->willReturn(true);

// set up consumer
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
// disable autosetup fabric so we do not mock more objects
$consumer->disableAutoSetupFabric();
$consumer->setChannel($amqpChannel);

$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(60);
$consumer->setGracefulMaxExecutionTimeoutExitCode(10);
$amqpChannel->callbacks = array('graceful_max_execution_timeout_test');

$amqpChannel->expects($this->exactly(1))
->method('wait')
->willThrowException(new AMQPTimeoutException());

$this->assertSame(10, $consumer->consume(1));
}

public function testGracefulMaxExecutionWontWaitIfPastTheTimeout()
{
// set up amqp connection
$amqpConnection = $this->prepareAMQPConnection();
// set up amqp channel
$amqpChannel = $this->prepareAMQPChannel();
$amqpChannel->expects($this->atLeastOnce())
->method('getChannelId')
->with()
->willReturn(true);
$amqpChannel->expects($this->once())
->method('basic_consume')
->withAnyParameters()
->willReturn(true);

// set up consumer
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
// disable autosetup fabric so we do not mock more objects
$consumer->disableAutoSetupFabric();
$consumer->setChannel($amqpChannel);

$consumer->setGracefulMaxExecutionDateTimeFromSecondsInTheFuture(0);
$amqpChannel->callbacks = array('graceful_max_execution_timeout_test');

$amqpChannel->expects($this->never())
->method('wait');

$consumer->consume(1);
}
}