Skip to content

Commit d09cfc9

Browse files
Alexandru PanturuAlexandru Panturu
authored andcommitted
add unit tests and change docs
1 parent 93ff22b commit d09cfc9

File tree

9 files changed

+253
-6
lines changed

9 files changed

+253
-6
lines changed

Event/AMQPEvent.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* Class AMQPEvent
1010
*
1111
* @package OldSound\RabbitMqBundle\Event
12+
* @codeCoverageIgnore
1213
*/
1314
class AMQPEvent extends Event
1415
{
@@ -20,7 +21,7 @@ class AMQPEvent extends Event
2021
* @var AMQPMessage
2122
*/
2223
protected $AMQPMessage;
23-
24+
2425
/**
2526
* @return AMQPMessage
2627
*/

Event/OnConsumeEvent.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* Class OnConsumeEvent
77
*
88
* @package OldSound\RabbitMqBundle\Command
9+
* @codeCoverageIgnore
910
*/
1011
class OnConsumeEvent extends AMQPEvent
1112
{

README.md

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,62 @@ For deleting the consumer's queue, use this command:
274274
$ ./app/console rabbitmq:delete --no-confirmation upload_picture
275275
```
276276
277+
#### Consumer Events ####
278+
279+
This can be useful in many scenarios.
280+
There are 3 AMQPEvents:
281+
##### ON CONSUME #####
282+
```php
283+
class OnConsumeEvent extends AMQPEvent
284+
{
285+
const NAME = AMQPEvent::ON_CONSUME;
286+
}
287+
```
288+
289+
Let`s say you need to sleep / stop consumer/s on a new application deploy.
290+
You can listen for OnConsumeEvent (\OldSound\RabbitMqBundle\Event\OnConsumeEvent) and check for new application deploy.
291+
292+
##### BEFORE PROCESSING MESSAGE #####
293+
294+
```php
295+
class BeforeProcessingMessageEvent extends AMQPEvent
296+
{
297+
const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;
298+
299+
/**
300+
* BeforeProcessingMessageEvent constructor.
301+
*
302+
* @param AMQPMessage $AMQPMessage
303+
*/
304+
public function __construct(AMQPMessage $AMQPMessage)
305+
{
306+
$this->AMQPMessage = $AMQPMessage;
307+
}
308+
}
309+
```
310+
Event raised before processing a AMQPMessage.
311+
312+
##### AFTER PROCESSING MESSAGE #####
313+
314+
```php
315+
class AfterProcessingMessageEvent extends AMQPEvent
316+
{
317+
const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;
318+
319+
/**
320+
* AfterProcessingMessageEvent constructor.
321+
*
322+
* @param AMQPMessage $AMQPMessage
323+
*/
324+
public function __construct(AMQPMessage $AMQPMessage)
325+
{
326+
$this->AMQPMessage = $AMQPMessage;
327+
}
328+
}
329+
```
330+
Event raised after processing a AMQPMessage.
331+
If the process message will throw an Exception the event will not raise.
332+
277333
#### Idle timeout ####
278334
279335
If you need to set a timeout when there are no messages from your queue during a period of time, you can set the `idle_timeout` in seconds:

RabbitMq/BaseAmqp.php

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
<?php
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
use OldSound\RabbitMqBundle\Event\AMQPEvent;
45
use PhpAmqpLib\Channel\AMQPChannel;
56
use PhpAmqpLib\Connection\AbstractConnection;
67
use PhpAmqpLib\Connection\AMQPLazyConnection;
@@ -256,15 +257,23 @@ public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
256257

257258
/**
258259
* @param string $eventName
259-
* @param Event $event
260+
* @param AMQPEvent $event
260261
*/
261-
protected function dispatchEvent($eventName, Event $event)
262+
protected function dispatchEvent($eventName, AMQPEvent $event)
262263
{
263-
if ($this->eventDispatcher) {
264-
$this->eventDispatcher->dispatch(
264+
if ($this->getEventDispatcher()) {
265+
$this->getEventDispatcher()->dispatch(
265266
$eventName,
266267
$event
267268
);
268269
}
269270
}
271+
272+
/**
273+
* @return EventDispatcherInterface
274+
*/
275+
public function getEventDispatcher()
276+
{
277+
return $this->eventDispatcher;
278+
}
270279
}

RabbitMq/DynamicConsumer.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ protected function setupConsumer()
4646
}
4747

4848
protected function mergeQueueOptions()
49-
{
49+
{
50+
if (null === $this->queueOptionsProvider) {
51+
return;
52+
}
5053
$this->queueOptions = array_merge($this->queueOptions, $this->queueOptionsProvider->getQueueOptions($this->context));
5154
}
5255
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\Event;
4+
5+
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class AfterProcessingMessageEventTest
10+
*
11+
* @package OldSound\RabbitMqBundle\Tests\Event
12+
*/
13+
class AfterProcessingMessageEventTest extends \PHPUnit_Framework_TestCase
14+
{
15+
16+
public function testEvent()
17+
{
18+
$AMQPMessage = new AMQPMessage('body');
19+
$event = new AfterProcessingMessageEvent($AMQPMessage);
20+
$this->assertSame($AMQPMessage, $event->getAMQPMessage());
21+
}
22+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\Tests\Event;
4+
5+
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
6+
use PhpAmqpLib\Message\AMQPMessage;
7+
8+
/**
9+
* Class BeforeProcessingMessageEventTest
10+
*
11+
* @package OldSound\RabbitMqBundle\Tests\Event
12+
*/
13+
class BeforeProcessingMessageEventTest extends \PHPUnit_Framework_TestCase
14+
{
15+
public function testEvent()
16+
{
17+
$AMQPMessage = new AMQPMessage('body');
18+
$event = new BeforeProcessingMessageEvent($AMQPMessage);
19+
$this->assertSame($AMQPMessage, $event->getAMQPMessage());
20+
}
21+
}

Tests/RabbitMq/BaseAmqpTest.php

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Event\AMQPEvent;
6+
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
7+
use OldSound\RabbitMqBundle\RabbitMq\BaseAmqp;
58
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
69
use PhpAmqpLib\Connection\AMQPLazyConnection;
710

@@ -17,4 +20,40 @@ public function testLazyConnection()
1720
$consumer = new Consumer($amqpLazyConnection, null);
1821
$consumer->getChannel();
1922
}
23+
24+
public function testDispatchEvent()
25+
{
26+
/** @var BaseAmqp|\PHPUnit_Framework_MockObject_MockObject $baseAmqpConsumer */
27+
$baseAmqpConsumer = $this->getMockBuilder('OldSound\RabbitMqBundle\RabbitMq\BaseAmqp')
28+
->disableOriginalConstructor()
29+
->getMock();
30+
$eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcher')
31+
->disableOriginalConstructor()
32+
->getMock();
33+
$baseAmqpConsumer->expects($this->atLeastOnce())
34+
->method('getEventDispatcher')
35+
->willReturn($eventDispatcher);
36+
37+
$eventDispatcher->expects($this->once())
38+
->method('dispatch')
39+
->with(AMQPEvent::ON_CONSUME, new AMQPEvent())
40+
->willReturn(true);
41+
$this->invokeMethod('dispatchEvent', $baseAmqpConsumer, array(AMQPEvent::ON_CONSUME, new AMQPEvent()));
42+
}
43+
44+
/**
45+
* @param $name
46+
* @param $obj
47+
* @param $params
48+
*
49+
* @return mixed
50+
*/
51+
protected function invokeMethod($name, $obj, $params)
52+
{
53+
$class = new \ReflectionClass(get_class($obj));
54+
$method = $class->getMethod($name);
55+
$method->setAccessible(true);
56+
57+
return $method->invokeArgs($obj, $params);
58+
}
2059
}

Tests/RabbitMq/ConsumerTest.php

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
namespace OldSound\RabbitMqBundle\Tests\RabbitMq;
44

5+
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6+
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
57
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
68
use PhpAmqpLib\Message\AMQPMessage;
79
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
@@ -58,7 +60,17 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque
5860
->will($this->returnCallback(function($delivery_tag) use ($expectedMethod) {
5961
\PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
6062
}));
63+
$eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcherInterface')
64+
->getMock();
65+
$consumer->setEventDispatcher($eventDispatcher);
6166

67+
$eventDispatcher->expects($this->atLeastOnce())
68+
->method('dispatch')
69+
->withConsecutive(
70+
array(BeforeProcessingMessageEvent::NAME, new BeforeProcessingMessageEvent($amqpMessage)),
71+
array(AfterProcessingMessageEvent::NAME, new AfterProcessingMessageEvent($amqpMessage))
72+
)
73+
->willReturn(true);
6274
$consumer->processMessage($amqpMessage);
6375
}
6476

@@ -73,4 +85,87 @@ public function processMessageProvider()
7385
array(ConsumerInterface::MSG_REJECT, 'basic_reject', false), // Reject and drop
7486
);
7587
}
88+
89+
/**
90+
* @return array
91+
*/
92+
public function consumeProvider()
93+
{
94+
$testCases[ "All ok 4 callbacks"] = array(
95+
array(
96+
"messages" => array(
97+
"msgCallback1",
98+
"msgCallback2",
99+
"msgCallback3",
100+
"msgCallback4",
101+
)
102+
)
103+
);
104+
105+
$testCases[ "No callbacks"] = array(
106+
array(
107+
"messages" => array(
108+
)
109+
)
110+
);
111+
112+
return $testCases;
113+
}
114+
115+
/**
116+
* @dataProvider consumeProvider
117+
*
118+
* @param $data
119+
*/
120+
public function testConsume($data)
121+
{
122+
$consumerCallBacks = $data['messages'];
123+
124+
// set up amqp connection
125+
$amqpConnection = $this->prepareAMQPConnection();
126+
// set up amqp channel
127+
$amqpChannel = $this->prepareAMQPChannel();
128+
$amqpChannel->expects($this->atLeastOnce())
129+
->method('getChannelId')
130+
->with()
131+
->willReturn(true);
132+
$amqpChannel->expects($this->once())
133+
->method('basic_consume')
134+
->withAnyParameters()
135+
->willReturn(true);
136+
137+
// set up consumer
138+
$consumer = $this->getConsumer($amqpConnection, $amqpChannel);
139+
// disable autosetup fabric so we do not mock more objects
140+
$consumer->disableAutoSetupFabric();
141+
$consumer->setChannel($amqpChannel);
142+
$amqpChannel->callbacks = $consumerCallBacks;
143+
144+
/**
145+
* Mock ait method and use a callback to remove one element each time from callbacks
146+
* This will simulate a basic consumer consume with provided messages count
147+
*/
148+
$amqpChannel->expects($this->exactly(count($consumerCallBacks)))
149+
->method('wait')
150+
->with(null, false, $consumer->getIdleTimeout())
151+
->will(
152+
$this->returnCallback(
153+
function () use ($amqpChannel) {
154+
/** remove an element on each loop like ... simulate an ACK */
155+
array_splice($amqpChannel->callbacks, 0, 1);
156+
})
157+
);
158+
159+
// set up event dispatcher
160+
$eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcher')
161+
->disableOriginalConstructor()
162+
->getMock();
163+
164+
$eventDispatcher->expects($this->exactly(count($consumerCallBacks)))
165+
->method('dispatch')
166+
->willReturn($eventDispatcher);
167+
168+
$consumer->setEventDispatcher($eventDispatcher);
169+
$consumer->consume(1);
170+
}
76171
}

0 commit comments

Comments
 (0)