Skip to content

Commit 93ff22b

Browse files
Alexandru PanturuAlexandru Panturu
authored andcommitted
add events
1 parent 2c39a7a commit 93ff22b

File tree

8 files changed

+179
-1
lines changed

8 files changed

+179
-1
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\DependencyInjection\Compiler;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\DependencyInjection\Reference;
8+
9+
/**
10+
* Class InjectEventDispatcherPass
11+
*
12+
* @package OldSound\RabbitMqBundle\DependencyInjection\Compiler
13+
*/
14+
class InjectEventDispatcherPass implements CompilerPassInterface
15+
{
16+
const EVENT_DISPATCHER_SERVICE_ID = 'event_dispatcher';
17+
18+
/**
19+
* @inheritDoc
20+
*/
21+
public function process(ContainerBuilder $container)
22+
{
23+
if (!$container->has(self::EVENT_DISPATCHER_SERVICE_ID)) {
24+
return;
25+
}
26+
$eventDispatcherDefinition = $container->findDefinition(self::EVENT_DISPATCHER_SERVICE_ID);
27+
$taggedConsumers = $container->findTaggedServiceIds('old_sound_rabbit_mq.base_amqp');
28+
29+
foreach ($taggedConsumers as $id => $tag) {
30+
$definition = $container->getDefinition($id);
31+
$definition->addMethodCall(
32+
'setEventDispatcher',
33+
[$eventDispatcherDefinition]
34+
);
35+
}
36+
37+
}
38+
}

Event/AMQPEvent.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use PhpAmqpLib\Message\AMQPMessage;
6+
use Symfony\Component\EventDispatcher\Event;
7+
8+
/**
9+
* Class AMQPEvent
10+
*
11+
* @package OldSound\RabbitMqBundle\Event
12+
*/
13+
class AMQPEvent extends Event
14+
{
15+
const ON_CONSUME = 'on_consume';
16+
const BEFORE_PROCESSING_MESSAGE = 'before_processing';
17+
const AFTER_PROCESSING_MESSAGE = 'after_processing';
18+
19+
/**
20+
* @var AMQPMessage
21+
*/
22+
protected $AMQPMessage;
23+
24+
/**
25+
* @return AMQPMessage
26+
*/
27+
public function getAMQPMessage()
28+
{
29+
return $this->AMQPMessage;
30+
}
31+
}

Event/AfterProcessingMessageEvent.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use PhpAmqpLib\Message\AMQPMessage;
6+
7+
/**
8+
* Class AfterProcessingMessageEvent
9+
*
10+
* @package OldSound\RabbitMqBundle\Event
11+
*/
12+
class AfterProcessingMessageEvent extends AMQPEvent
13+
{
14+
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
15+
16+
/**
17+
* AfterProcessingMessageEvent constructor.
18+
*
19+
* @param AMQPMessage $AMQPMessage
20+
*/
21+
public function __construct(AMQPMessage $AMQPMessage)
22+
{
23+
$this->AMQPMessage = $AMQPMessage;
24+
}
25+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use PhpAmqpLib\Message\AMQPMessage;
6+
7+
/**
8+
* Class BeforeProcessingMessageEvent
9+
*
10+
* @package OldSound\RabbitMqBundle\Command
11+
*/
12+
class BeforeProcessingMessageEvent extends AMQPEvent
13+
{
14+
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
15+
16+
/**
17+
* BeforeProcessingMessageEvent constructor.
18+
*
19+
* @param AMQPMessage $AMQPMessage
20+
*/
21+
public function __construct(AMQPMessage $AMQPMessage)
22+
{
23+
$this->AMQPMessage = $AMQPMessage;
24+
}
25+
}

Event/OnConsumeEvent.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
/**
6+
* Class OnConsumeEvent
7+
*
8+
* @package OldSound\RabbitMqBundle\Command
9+
*/
10+
class OnConsumeEvent extends AMQPEvent
11+
{
12+
const NAME = AMQPEvent::ON_CONSUME;
13+
}

OldSoundRabbitMqBundle.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle;
44

5+
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\InjectEventDispatcherPass;
56
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;
67
use Symfony\Component\HttpKernel\Bundle\Bundle;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -13,5 +14,6 @@ public function build(ContainerBuilder $container)
1314
parent::build($container);
1415

1516
$container->addCompilerPass(new RegisterPartsPass());
17+
$container->addCompilerPass(new InjectEventDispatcherPass());
1618
}
1719
}

RabbitMq/BaseAmqp.php

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use PhpAmqpLib\Connection\AMQPLazyConnection;
77
use Psr\Log\LoggerInterface;
88
use Psr\Log\NullLogger;
9+
use Symfony\Component\EventDispatcher\Event;
10+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
911

1012
abstract class BaseAmqp
1113
{
@@ -22,7 +24,7 @@ abstract class BaseAmqp
2224
* @var LoggerInterface
2325
*/
2426
protected $logger;
25-
27+
2628
protected $exchangeOptions = array(
2729
'passive' => false,
2830
'durable' => true,
@@ -46,6 +48,11 @@ abstract class BaseAmqp
4648
'declare' => true,
4749
);
4850

51+
/**
52+
* @var EventDispatcherInterface
53+
*/
54+
protected $eventDispatcher;
55+
4956
/**
5057
* @param AbstractConnection $conn
5158
* @param AMQPChannel|null $ch
@@ -234,4 +241,30 @@ public function setLogger($logger)
234241
{
235242
$this->logger = $logger;
236243
}
244+
245+
/**
246+
* @param EventDispatcherInterface $eventDispatcher
247+
*
248+
* @return BaseAmqp
249+
*/
250+
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
251+
{
252+
$this->eventDispatcher = $eventDispatcher;
253+
254+
return $this;
255+
}
256+
257+
/**
258+
* @param string $eventName
259+
* @param Event $event
260+
*/
261+
protected function dispatchEvent($eventName, Event $event)
262+
{
263+
if ($this->eventDispatcher) {
264+
$this->eventDispatcher->dispatch(
265+
$eventName,
266+
$event
267+
);
268+
}
269+
}
237270
}

RabbitMq/Consumer.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6+
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7+
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
58
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
69
use PhpAmqpLib\Message\AMQPMessage;
710

@@ -44,6 +47,7 @@ public function consume($msgAmount)
4447
$this->setupConsumer();
4548

4649
while (count($this->getChannel()->callbacks)) {
50+
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent());
4751
$this->maybeStopConsumer();
4852
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
4953
}
@@ -67,9 +71,16 @@ public function delete()
6771

6872
public function processMessage(AMQPMessage $msg)
6973
{
74+
$this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
75+
new BeforeProcessingMessageEvent($msg)
76+
);
7077
try {
7178
$processFlag = call_user_func($this->callback, $msg);
7279
$this->handleProcessMessage($msg, $processFlag);
80+
$this->dispatchEvent(
81+
AfterProcessingMessageEvent::NAME,
82+
new AfterProcessingMessageEvent($msg)
83+
);
7384
$this->logger->debug('Queue message processed', array(
7485
'amqp' => array(
7586
'queue' => $this->queueOptions['name'],

0 commit comments

Comments
 (0)