Skip to content

Commit 5818690

Browse files
authored
Merge pull request #369 from alexandrupanturu/event_ref
AMQP Events
2 parents 49faf6d + 657357b commit 5818690

14 files changed

+523
-2
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\DependencyInjection\Compiler;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\DependencyInjection\Reference;
8+
9+
/**
10+
* Class InjectEventDispatcherPass
11+
*
12+
* @package OldSound\RabbitMqBundle\DependencyInjection\Compiler
13+
*/
14+
class InjectEventDispatcherPass implements CompilerPassInterface
15+
{
16+
const EVENT_DISPATCHER_SERVICE_ID = 'event_dispatcher';
17+
18+
/**
19+
* @inheritDoc
20+
*/
21+
public function process(ContainerBuilder $container)
22+
{
23+
if (!$container->has(self::EVENT_DISPATCHER_SERVICE_ID)) {
24+
return;
25+
}
26+
$eventDispatcherDefinition = $container->findDefinition(self::EVENT_DISPATCHER_SERVICE_ID);
27+
$taggedConsumers = $container->findTaggedServiceIds('old_sound_rabbit_mq.base_amqp');
28+
29+
foreach ($taggedConsumers as $id => $tag) {
30+
$definition = $container->getDefinition($id);
31+
$definition->addMethodCall(
32+
'setEventDispatcher',
33+
[$eventDispatcherDefinition]
34+
);
35+
}
36+
37+
}
38+
}

Event/AMQPEvent.php

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
/**
10+
* Class AMQPEvent
11+
*
12+
* @package OldSound\RabbitMqBundle\Event
13+
* @codeCoverageIgnore
14+
*/
15+
class AMQPEvent extends Event
16+
{
17+
const ON_CONSUME = 'on_consume';
18+
const BEFORE_PROCESSING_MESSAGE = 'before_processing';
19+
const AFTER_PROCESSING_MESSAGE = 'after_processing';
20+
21+
/**
22+
* @var AMQPMessage
23+
*/
24+
protected $AMQPMessage;
25+
26+
/**
27+
* @var Consumer
28+
*/
29+
protected $consumer;
30+
31+
/**
32+
* @return AMQPMessage
33+
*/
34+
public function getAMQPMessage()
35+
{
36+
return $this->AMQPMessage;
37+
}
38+
39+
/**
40+
* @param AMQPMessage $AMQPMessage
41+
*
42+
* @return AMQPEvent
43+
*/
44+
public function setAMQPMessage(AMQPMessage $AMQPMessage)
45+
{
46+
$this->AMQPMessage = $AMQPMessage;
47+
48+
return $this;
49+
}
50+
51+
/**
52+
* @return Consumer
53+
*/
54+
public function getConsumer()
55+
{
56+
return $this->consumer;
57+
}
58+
59+
/**
60+
* @param Consumer $consumer
61+
*
62+
* @return AMQPEvent
63+
*/
64+
public function setConsumer(Consumer $consumer)
65+
{
66+
$this->consumer = $consumer;
67+
68+
return $this;
69+
}
70+
}

Event/AfterProcessingMessageEvent.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class AfterProcessingMessageEvent
10+
*
11+
* @package OldSound\RabbitMqBundle\Event
12+
*/
13+
class AfterProcessingMessageEvent extends AMQPEvent
14+
{
15+
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
16+
17+
/**
18+
* AfterProcessingMessageEvent constructor.
19+
*
20+
* @param AMQPMessage $AMQPMessage
21+
*/
22+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
23+
{
24+
$this->setConsumer($consumer);
25+
$this->setAMQPMessage($AMQPMessage);
26+
}
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class BeforeProcessingMessageEvent
10+
*
11+
* @package OldSound\RabbitMqBundle\Command
12+
*/
13+
class BeforeProcessingMessageEvent extends AMQPEvent
14+
{
15+
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
16+
17+
/**
18+
* BeforeProcessingMessageEvent constructor.
19+
*
20+
* @param AMQPMessage $AMQPMessage
21+
*/
22+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
23+
{
24+
$this->setConsumer($consumer);
25+
$this->setAMQPMessage($AMQPMessage);
26+
}
27+
}

Event/OnConsumeEvent.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Event;
4+
5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
7+
/**
8+
* Class OnConsumeEvent
9+
*
10+
* @package OldSound\RabbitMqBundle\Command
11+
*/
12+
class OnConsumeEvent extends AMQPEvent
13+
{
14+
const NAME = AMQPEvent::ON_CONSUME;
15+
16+
/**
17+
* OnConsumeEvent constructor.
18+
*
19+
* @param Consumer $consumer
20+
*/
21+
public function __construct(Consumer $consumer)
22+
{
23+
$this->setConsumer($consumer);
24+
}
25+
}

OldSoundRabbitMqBundle.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle;
44

5+
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\InjectEventDispatcherPass;
56
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;
67
use Symfony\Component\HttpKernel\Bundle\Bundle;
78
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -13,5 +14,6 @@ public function build(ContainerBuilder $container)
1314
parent::build($container);
1415

1516
$container->addCompilerPass(new RegisterPartsPass());
17+
$container->addCompilerPass(new InjectEventDispatcherPass());
1618
}
1719
}

README.md

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,74 @@ For deleting the consumer's queue, use this command:
275275
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
276276
```
277277
278+
#### Consumer Events ####
279+
280+
This can be useful in many scenarios.
281+
There are 3 AMQPEvents:
282+
##### ON CONSUME #####
283+
```php
284+
class OnConsumeEvent extends AMQPEvent
285+
{
286+
const NAME = AMQPEvent::ON_CONSUME;
287+
288+
/**
289+
* OnConsumeEvent constructor.
290+
*
291+
* @param Consumer $consumer
292+
*/
293+
public function __construct(Consumer $consumer)
294+
{
295+
$this->setConsumer($consumer);
296+
}
297+
}
298+
```
299+
300+
Let`s say you need to sleep / stop consumer/s on a new application deploy.
301+
You can listen for OnConsumeEvent (\OldSound\RabbitMqBundle\Event\OnConsumeEvent) and check for new application deploy.
302+
303+
##### BEFORE PROCESSING MESSAGE #####
304+
305+
```php
306+
class BeforeProcessingMessageEvent extends AMQPEvent
307+
{
308+
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
309+
310+
/**
311+
* BeforeProcessingMessageEvent constructor.
312+
*
313+
* @param AMQPMessage $AMQPMessage
314+
*/
315+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
316+
{
317+
$this->setConsumer($consumer);
318+
$this->setAMQPMessage($AMQPMessage);
319+
}
320+
}
321+
```
322+
Event raised before processing a AMQPMessage.
323+
324+
##### AFTER PROCESSING MESSAGE #####
325+
326+
```php
327+
class AfterProcessingMessageEvent extends AMQPEvent
328+
{
329+
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
330+
331+
/**
332+
* AfterProcessingMessageEvent constructor.
333+
*
334+
* @param AMQPMessage $AMQPMessage
335+
*/
336+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
337+
{
338+
$this->setConsumer($consumer);
339+
$this->setAMQPMessage($AMQPMessage);
340+
}
341+
}
342+
```
343+
Event raised after processing a AMQPMessage.
344+
If the process message will throw an Exception the event will not raise.
345+
278346
#### Idle timeout ####
279347
280348
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds:

RabbitMq/BaseAmqp.php

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
<?php
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
use OldSound\RabbitMqBundle\Event\AMQPEvent;
45
use PhpAmqpLib\Channel\AMQPChannel;
56
use PhpAmqpLib\Connection\AbstractConnection;
67
use PhpAmqpLib\Connection\AMQPLazyConnection;
78
use Psr\Log\LoggerInterface;
89
use Psr\Log\NullLogger;
10+
use Symfony\Component\EventDispatcher\Event;
11+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
912

1013
abstract class BaseAmqp
1114
{
@@ -22,7 +25,7 @@ abstract class BaseAmqp
2225
* @var LoggerInterface
2326
*/
2427
protected $logger;
25-
28+
2629
protected $exchangeOptions = array(
2730
'passive' => false,
2831
'durable' => true,
@@ -46,6 +49,11 @@ abstract class BaseAmqp
4649
'declare' => true,
4750
);
4851

52+
/**
53+
* @var EventDispatcherInterface
54+
*/
55+
protected $eventDispatcher;
56+
4957
/**
5058
* @param AbstractConnection $conn
5159
* @param AMQPChannel|null $ch
@@ -234,4 +242,38 @@ public function setLogger($logger)
234242
{
235243
$this->logger = $logger;
236244
}
245+
246+
/**
247+
* @param EventDispatcherInterface $eventDispatcher
248+
*
249+
* @return BaseAmqp
250+
*/
251+
public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
252+
{
253+
$this->eventDispatcher = $eventDispatcher;
254+
255+
return $this;
256+
}
257+
258+
/**
259+
* @param string $eventName
260+
* @param AMQPEvent $event
261+
*/
262+
protected function dispatchEvent($eventName, AMQPEvent $event)
263+
{
264+
if ($this->getEventDispatcher()) {
265+
$this->getEventDispatcher()->dispatch(
266+
$eventName,
267+
$event
268+
);
269+
}
270+
}
271+
272+
/**
273+
* @return EventDispatcherInterface
274+
*/
275+
public function getEventDispatcher()
276+
{
277+
return $this->eventDispatcher;
278+
}
237279
}

RabbitMq/Consumer.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6+
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7+
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
58
use OldSound\RabbitMqBundle\RabbitMq\BaseConsumer;
69
use PhpAmqpLib\Message\AMQPMessage;
710

@@ -44,6 +47,7 @@ public function consume($msgAmount)
4447
$this->setupConsumer();
4548

4649
while (count($this->getChannel()->callbacks)) {
50+
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
4751
$this->maybeStopConsumer();
4852
if (!$this->forceStop) {
4953
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
@@ -69,9 +73,16 @@ public function delete()
6973

7074
public function processMessage(AMQPMessage $msg)
7175
{
76+
$this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
77+
new BeforeProcessingMessageEvent($this, $msg)
78+
);
7279
try {
7380
$processFlag = call_user_func($this->callback, $msg);
7481
$this->handleProcessMessage($msg, $processFlag);
82+
$this->dispatchEvent(
83+
AfterProcessingMessageEvent::NAME,
84+
new AfterProcessingMessageEvent($this, $msg)
85+
);
7586
$this->logger->debug('Queue message processed', array(
7687
'amqp' => array(
7788
'queue' => $this->queueOptions['name'],

0 commit comments

Comments
 (0)