-
Notifications
You must be signed in to change notification settings - Fork 466
Add graceful max execution timeout. #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,11 +13,25 @@ | |
|
||
class Consumer extends BaseConsumer | ||
{ | ||
const TIMEOUT_TYPE_IDLE = 'idle'; | ||
const TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION = 'graceful-max-execution'; | ||
|
||
/** | ||
* @var int $memoryLimit | ||
*/ | ||
protected $memoryLimit = null; | ||
|
||
/** | ||
* @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that | ||
* any currently running consumption will not be interrupted. | ||
*/ | ||
protected $gracefulMaxExecutionDateTime; | ||
|
||
/** | ||
* @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. | ||
*/ | ||
protected $gracefulMaxExecutionTimeoutExitCode = 0; | ||
|
||
/** | ||
* Set the memory limit | ||
* | ||
|
@@ -42,6 +56,7 @@ public function getMemoryLimit() | |
* Consume the message | ||
* | ||
* @param int $msgAmount | ||
* @return int | ||
*/ | ||
public function consume($msgAmount) | ||
{ | ||
|
@@ -52,10 +67,27 @@ public function consume($msgAmount) | |
while (count($this->getChannel()->callbacks)) { | ||
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this)); | ||
$this->maybeStopConsumer(); | ||
|
||
/* | ||
* Be careful not to trigger ::wait() with 0 or less seconds, when | ||
* graceful max execution timeout is being used. | ||
*/ | ||
$waitTimeout = $this->chooseWaitTimeout(); | ||
if ( | ||
$waitTimeout['timeoutType'] === self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION | ||
&& $waitTimeout['seconds'] < 1 | ||
) { | ||
return $this->gracefulMaxExecutionTimeoutExitCode; | ||
} | ||
|
||
if (!$this->forceStop) { | ||
try { | ||
$this->getChannel()->wait(null, false, $this->getIdleTimeout()); | ||
$this->getChannel()->wait(null, false, $waitTimeout['seconds']); | ||
} catch (AMQPTimeoutException $e) { | ||
if (self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION === $waitTimeout['timeoutType']) { | ||
return $this->gracefulMaxExecutionTimeoutExitCode; | ||
} | ||
|
||
$idleEvent = new OnIdleEvent($this); | ||
$this->dispatchEvent(OnIdleEvent::NAME, $idleEvent); | ||
|
||
|
@@ -69,6 +101,8 @@ public function consume($msgAmount) | |
} | ||
} | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
/** | ||
|
@@ -176,4 +210,92 @@ protected function isRamAlmostOverloaded() | |
|
||
return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit(), '5M'); | ||
} | ||
|
||
/** | ||
* @param \DateTime|null $dateTime | ||
*/ | ||
public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null) | ||
{ | ||
$this->gracefulMaxExecutionDateTime = $dateTime; | ||
} | ||
|
||
/** | ||
* @param int $secondsInTheFuture | ||
*/ | ||
public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture) | ||
{ | ||
$this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds")); | ||
} | ||
|
||
/** | ||
* @param int $exitCode | ||
*/ | ||
public function setGracefulMaxExecutionTimeoutExitCode($exitCode) | ||
{ | ||
$this->gracefulMaxExecutionTimeoutExitCode = $exitCode; | ||
} | ||
|
||
/** | ||
* @return \DateTime|null | ||
*/ | ||
public function getGracefulMaxExecutionDateTime() | ||
{ | ||
return $this->gracefulMaxExecutionDateTime; | ||
} | ||
|
||
/** | ||
* @return int | ||
*/ | ||
public function getGracefulMaxExecutionTimeoutExitCode() | ||
{ | ||
return $this->gracefulMaxExecutionTimeoutExitCode; | ||
} | ||
|
||
/** | ||
* Choose the timeout to use for the $this->getChannel()->wait() method. | ||
* | ||
* @return array Of structure | ||
* { | ||
* timeoutType: string; // one of self::TIMEOUT_TYPE_* | ||
* seconds: int; | ||
* } | ||
*/ | ||
private function chooseWaitTimeout() | ||
{ | ||
if ($this->gracefulMaxExecutionDateTime) { | ||
$allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime()); | ||
$allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400 | ||
+ $allowedExecutionDateInterval->h * 3600 | ||
+ $allowedExecutionDateInterval->i * 60 | ||
+ $allowedExecutionDateInterval->s; | ||
|
||
if (!$allowedExecutionDateInterval->invert) { | ||
$allowedExecutionSeconds *= -1; | ||
} | ||
|
||
/* | ||
* Respect the idle timeout if it's set and if it's less than | ||
* the remaining allowed execution. | ||
*/ | ||
if ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry, but I disagree. The code is not duplicated. This return statement is the same as the one a the end of the method, but the intention is different. The code after line 294 means: "do this, if Graceful Max Execution feature is not enabled". The code in the if statement starting on line 280 means: "Use Idle Timeout, if it's less than the computed remaining allowed execution". Currently, they are same in terms of code. But if tomorrow someone wanted to add some other crazy type of timeout, then they would modify the code after line 294. I bet they wouldn't think: "Ok, that other guy from Graceful whatever relied on the code after line 294 to return the Idle timeout". And then my code breaks. Clear naming and no duplication are my prime principles. However I wouldn't trade "being proper" for "being pragmatic" here. Please confirm, that you'd like me to remove the "duplication" despite what I said. |
||
$this->getIdleTimeout() | ||
&& $this->getIdleTimeout() < $allowedExecutionSeconds | ||
) { | ||
return array( | ||
'timeoutType' => self::TIMEOUT_TYPE_IDLE, | ||
'seconds' => $this->getIdleTimeout(), | ||
); | ||
} | ||
|
||
return array( | ||
'timeoutType' => self::TIMEOUT_TYPE_GRACEFUL_MAX_EXECUTION, | ||
'seconds' => $allowedExecutionSeconds, | ||
); | ||
} | ||
|
||
return array( | ||
'timeoutType' => self::TIMEOUT_TYPE_IDLE, | ||
'seconds' => $this->getIdleTimeout(), | ||
); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should add some validation to be sure you get seconds, maybe on configuration level?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean validation, that I get an integer? Or that I get seconds and not minutes or hours?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added validation, that the value is integer on configuration level. Please let me know if this is enough.