Skip to content

Commit e8d0101

Browse files
implements keep_alive option for batch consumers
1 parent 934fe5f commit e8d0101

File tree

4 files changed

+27
-0
lines changed

4 files changed

+27
-0
lines changed

DependencyInjection/Configuration.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ protected function addBatchConsumers(ArrayNodeDefinition $node)
272272
->scalarNode('idle_timeout')->end()
273273
->scalarNode('timeout_wait')->defaultValue(3)->end()
274274
->scalarNode('idle_timeout_exit_code')->end()
275+
->scalarNode('keep_alive')->defaultFalse()->end()
275276
->scalarNode('auto_setup_fabric')->defaultTrue()->end()
276277
->arrayNode('qos_options')
277278
->children()

DependencyInjection/OldSoundRabbitMqExtension.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,10 @@ protected function loadBatchConsumers()
418418
$definition->addMethodCall('disableAutoSetupFabric');
419419
}
420420

421+
if ($consumer['keep_alive']) {
422+
$definition->addMethodCall('keepAlive');
423+
}
424+
421425
$this->injectConnection($definition, $consumer['connection']);
422426
if ($this->collectorEnabled) {
423427
$this->injectLoggedChannel($definition, $key, $consumer['connection']);

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,11 @@ batch_consumers:
810810
timeout_wait: 5
811811
auto_setup_fabric: false
812812
idle_timeout_exit_code: -2
813+
keep_alive: false
813814
```
814815

816+
*Note*: If the `keep_alive` option is set to `true`, `idle_timeout_exit_code` will be ignored and the consumer process continues.
817+
815818
You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge.
816819

817820
```php

RabbitMq/BatchConsumer.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface
2424
*/
2525
private $idleTimeout = 0;
2626

27+
/**
28+
* @var bool
29+
*/
30+
private $keepAlive = false;
31+
2732
/**
2833
* @var int
2934
*/
@@ -84,6 +89,8 @@ public function consume()
8489
} catch (AMQPTimeoutException $e) {
8590
if (!$this->isEmptyBatch()) {
8691
$this->batchConsume();
92+
} elseif ($this->keepAlive === true) {
93+
continue;
8794
} elseif (null !== $this->getIdleTimeoutExitCode()) {
8895
return $this->getIdleTimeoutExitCode();
8996
} else {
@@ -399,6 +406,18 @@ public function setIdleTimeoutExitCode($idleTimeoutExitCode)
399406
return $this;
400407
}
401408

409+
/**
410+
* keepAlive
411+
*
412+
* @return $this
413+
*/
414+
public function keepAlive()
415+
{
416+
$this->keepAlive = true;
417+
418+
return $this;
419+
}
420+
402421
/**
403422
* Purge the queue
404423
*/

0 commit comments

Comments
 (0)