Skip to content

AMQP Events #369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions DependencyInjection/Compiler/InjectEventDispatcherPass.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace OldSound\RabbitMqBundle\DependencyInjection\Compiler;

use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Reference;

/**
* Class InjectEventDispatcherPass
*
* @package OldSound\RabbitMqBundle\DependencyInjection\Compiler
*/
class InjectEventDispatcherPass implements CompilerPassInterface
{
const EVENT_DISPATCHER_SERVICE_ID = 'event_dispatcher';

/**
* @inheritDoc
*/
public function process(ContainerBuilder $container)
{
if (!$container->has(self::EVENT_DISPATCHER_SERVICE_ID)) {
return;
}
$eventDispatcherDefinition = $container->findDefinition(self::EVENT_DISPATCHER_SERVICE_ID);
$taggedConsumers = $container->findTaggedServiceIds('old_sound_rabbit_mq.base_amqp');

foreach ($taggedConsumers as $id => $tag) {
$definition = $container->getDefinition($id);
$definition->addMethodCall(
'setEventDispatcher',
[$eventDispatcherDefinition]
);
}

}
}
70 changes: 70 additions & 0 deletions Event/AMQPEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Component\EventDispatcher\Event;

/**
* Class AMQPEvent
*
* @package OldSound\RabbitMqBundle\Event
* @codeCoverageIgnore
*/
class AMQPEvent extends Event
{
const ON_CONSUME = 'on_consume';
const BEFORE_PROCESSING_MESSAGE = 'before_processing';
const AFTER_PROCESSING_MESSAGE = 'after_processing';

/**
* @var AMQPMessage
*/
protected $AMQPMessage;

/**
* @var Consumer
*/
protected $consumer;

/**
* @return AMQPMessage
*/
public function getAMQPMessage()
{
return $this->AMQPMessage;
}

/**
* @param AMQPMessage $AMQPMessage
*
* @return AMQPEvent
*/
public function setAMQPMessage(AMQPMessage $AMQPMessage)
{
$this->AMQPMessage = $AMQPMessage;

return $this;
}

/**
* @return Consumer
*/
public function getConsumer()
{
return $this->consumer;
}

/**
* @param Consumer $consumer
*
* @return AMQPEvent
*/
public function setConsumer(Consumer $consumer)
{
$this->consumer = $consumer;

return $this;
}
}
27 changes: 27 additions & 0 deletions Event/AfterProcessingMessageEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Message\AMQPMessage;

/**
* Class AfterProcessingMessageEvent
*
* @package OldSound\RabbitMqBundle\Event
*/
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
27 changes: 27 additions & 0 deletions Event/BeforeProcessingMessageEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use PhpAmqpLib\Message\AMQPMessage;

/**
* Class BeforeProcessingMessageEvent
*
* @package OldSound\RabbitMqBundle\Command
*/
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
25 changes: 25 additions & 0 deletions Event/OnConsumeEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace OldSound\RabbitMqBundle\Event;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;

/**
* Class OnConsumeEvent
*
* @package OldSound\RabbitMqBundle\Command
*/
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent::ON_CONSUME;

/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct(Consumer $consumer)
{
$this->setConsumer($consumer);
}
}
2 changes: 2 additions & 0 deletions OldSoundRabbitMqBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace OldSound\RabbitMqBundle;

use OldSound\RabbitMqBundle\DependencyInjection\Compiler\InjectEventDispatcherPass;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;
use Symfony\Component\HttpKernel\Bundle\Bundle;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -13,5 +14,6 @@ public function build(ContainerBuilder $container)
parent::build($container);

$container->addCompilerPass(new RegisterPartsPass());
$container->addCompilerPass(new InjectEventDispatcherPass());
}
}
68 changes: 68 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,74 @@ For deleting the consumer's queue, use this command:
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
```

#### Consumer Events ####

This can be useful in many scenarios.
There are 3 AMQPEvents:
##### ON CONSUME #####
```php
class OnConsumeEvent extends AMQPEvent
{
const NAME = AMQPEvent::ON_CONSUME;

/**
* OnConsumeEvent constructor.
*
* @param Consumer $consumer
*/
public function __construct(Consumer $consumer)
{
$this->setConsumer($consumer);
}
}
```

Let`s say you need to sleep / stop consumer/s on a new application deploy.
You can listen for OnConsumeEvent (\OldSound\RabbitMqBundle\Event\OnConsumeEvent) and check for new application deploy.

##### BEFORE PROCESSING MESSAGE #####

```php
class BeforeProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

/**
* BeforeProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
```
Event raised before processing a AMQPMessage.

##### AFTER PROCESSING MESSAGE #####

```php
class AfterProcessingMessageEvent extends AMQPEvent
{
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

/**
* AfterProcessingMessageEvent constructor.
*
* @param AMQPMessage $AMQPMessage
*/
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
{
$this->setConsumer($consumer);
$this->setAMQPMessage($AMQPMessage);
}
}
```
Event raised after processing a AMQPMessage.
If the process message will throw an Exception the event will not raise.

#### Idle timeout ####

If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds:
Expand Down
58 changes: 54 additions & 4 deletions RabbitMq/BaseAmqp.php
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
<?php

namespace OldSound\RabbitMqBundle\RabbitMq;
use OldSound\RabbitMqBundle\Event\AMQPEvent;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\EventDispatcher\Event;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;

abstract class BaseAmqp
{
Expand All @@ -22,7 +25,7 @@ abstract class BaseAmqp
* @var LoggerInterface
*/
protected $logger;

protected $exchangeOptions = array(
'passive' => false,
'durable' => true,
Expand All @@ -46,6 +49,11 @@ abstract class BaseAmqp
'declare' => true,
);

/**
* @var EventDispatcherInterface
*/
protected $eventDispatcher;

/**
* @param AbstractConnection $conn
* @param AMQPChannel|null $ch
Expand All @@ -68,11 +76,19 @@ public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $c
public function __destruct()
{
if ($this->ch) {
$this->ch->close();
try {
$this->ch->close();
} catch (\Exception $e) {
// ignore on shutdown
}
}

if ($this->conn && $this->conn->isConnected()) {
$this->conn->close();
try {
$this->conn->close();
} catch (\Exception $e) {
// ignore on shutdown
}
}
}

Expand Down Expand Up @@ -226,4 +242,38 @@ public function setLogger($logger)
{
$this->logger = $logger;
}

/**
* @param EventDispatcherInterface $eventDispatcher
*
* @return BaseAmqp
*/
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
{
$this->eventDispatcher = $eventDispatcher;

return $this;
}

/**
* @param string $eventName
* @param AMQPEvent $event
*/
protected function dispatchEvent($eventName, AMQPEvent $event)
{
if ($this->getEventDispatcher()) {
$this->getEventDispatcher()->dispatch(
$eventName,
$event
);
}
}

/**
* @return EventDispatcherInterface
*/
public function getEventDispatcher()
{
return $this->eventDispatcher;
}
}
Loading