Skip to content

Commit 877fb12

Browse files
Alexandru PanturuAlexandru Panturu
authored andcommitted
add consumer to Event and fix unit tests
1 parent d09cfc9 commit 877fb12

9 files changed

+113
-18
lines changed

Event/AMQPEvent.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle\Event;
44

5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
56
use PhpAmqpLib\Message\AMQPMessage;
67
use Symfony\Component\EventDispatcher\Event;
78

@@ -22,11 +23,48 @@ class AMQPEvent extends Event
2223
*/
2324
protected $AMQPMessage;
2425

26+
/**
27+
* @var Consumer
28+
*/
29+
protected $consumer;
30+
2531
/**
2632
* @return AMQPMessage
2733
*/
2834
public function getAMQPMessage()
2935
{
3036
return $this->AMQPMessage;
3137
}
38+
39+
/**
40+
* @param AMQPMessage $AMQPMessage
41+
*
42+
* @return AMQPEvent
43+
*/
44+
public function setAMQPMessage(AMQPMessage $AMQPMessage)
45+
{
46+
$this->AMQPMessage = $AMQPMessage;
47+
48+
return $this;
49+
}
50+
51+
/**
52+
* @return Consumer
53+
*/
54+
public function getConsumer()
55+
{
56+
return $this->consumer;
57+
}
58+
59+
/**
60+
* @param Consumer $consumer
61+
*
62+
* @return AMQPEvent
63+
*/
64+
public function setConsumer(Consumer $consumer)
65+
{
66+
$this->consumer = $consumer;
67+
68+
return $this;
69+
}
3270
}

Event/AfterProcessingMessageEvent.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle\Event;
44

5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
56
use PhpAmqpLib\Message\AMQPMessage;
67

78
/**
@@ -18,8 +19,9 @@ class AfterProcessingMessageEvent extends AMQPEvent
1819
*
1920
* @param AMQPMessage $AMQPMessage
2021
*/
21-
public function __construct(AMQPMessage $AMQPMessage)
22+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
2223
{
23-
$this->AMQPMessage = $AMQPMessage;
24+
$this->setConsumer($consumer);
25+
$this->setAMQPMessage($AMQPMessage);
2426
}
2527
}

Event/BeforeProcessingMessageEvent.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace OldSound\RabbitMqBundle\Event;
44

5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
56
use PhpAmqpLib\Message\AMQPMessage;
67

78
/**
@@ -18,8 +19,9 @@ class BeforeProcessingMessageEvent extends AMQPEvent
1819
*
1920
* @param AMQPMessage $AMQPMessage
2021
*/
21-
public function __construct(AMQPMessage $AMQPMessage)
22+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
2223
{
23-
$this->AMQPMessage = $AMQPMessage;
24+
$this->setConsumer($consumer);
25+
$this->setAMQPMessage($AMQPMessage);
2426
}
2527
}

Event/OnConsumeEvent.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,24 @@
22

33
namespace OldSound\RabbitMqBundle\Event;
44

5+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6+
57
/**
68
* Class OnConsumeEvent
79
*
810
* @package OldSound\RabbitMqBundle\Command
9-
* @codeCoverageIgnore
1011
*/
1112
class OnConsumeEvent extends AMQPEvent
1213
{
1314
const NAME = AMQPEvent::ON_CONSUME;
15+
16+
/**
17+
* OnConsumeEvent constructor.
18+
*
19+
* @param Consumer $consumer
20+
*/
21+
public function __construct(Consumer $consumer)
22+
{
23+
$this->setConsumer($consumer);
24+
}
1425
}

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,16 @@ There are 3 AMQPEvents:
283283
class OnConsumeEvent extends AMQPEvent
284284
{
285285
const NAME = AMQPEvent::ON_CONSUME;
286+
287+
/**
288+
* OnConsumeEvent constructor.
289+
*
290+
* @param Consumer $consumer
291+
*/
292+
public function __construct(Consumer $consumer)
293+
{
294+
$this->setConsumer($consumer);
295+
}
286296
}
287297
```
288298
@@ -301,9 +311,10 @@ class BeforeProcessingMessageEvent extends AMQPEvent
301311
*
302312
* @param AMQPMessage $AMQPMessage
303313
*/
304-
public function __construct(AMQPMessage $AMQPMessage)
314+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
305315
{
306-
$this->AMQPMessage = $AMQPMessage;
316+
$this->setConsumer($consumer);
317+
$this->setAMQPMessage($AMQPMessage);
307318
}
308319
}
309320
```
@@ -321,9 +332,10 @@ class AfterProcessingMessageEvent extends AMQPEvent
321332
*
322333
* @param AMQPMessage $AMQPMessage
323334
*/
324-
public function __construct(AMQPMessage $AMQPMessage)
335+
public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
325336
{
326-
$this->AMQPMessage = $AMQPMessage;
337+
$this->setConsumer($consumer);
338+
$this->setAMQPMessage($AMQPMessage);
327339
}
328340
}
329341
```

RabbitMq/Consumer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public function consume($msgAmount)
4747
$this->setupConsumer();
4848

4949
while (count($this->getChannel()->callbacks)) {
50-
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent());
50+
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
5151
$this->maybeStopConsumer();
5252
$this->getChannel()->wait(null, false, $this->getIdleTimeout());
5353
}
@@ -72,14 +72,14 @@ public function delete()
7272
public function processMessage(AMQPMessage $msg)
7373
{
7474
$this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
75-
new BeforeProcessingMessageEvent($msg)
75+
new BeforeProcessingMessageEvent($this, $msg)
7676
);
7777
try {
7878
$processFlag = call_user_func($this->callback, $msg);
7979
$this->handleProcessMessage($msg, $processFlag);
8080
$this->dispatchEvent(
8181
AfterProcessingMessageEvent::NAME,
82-
new AfterProcessingMessageEvent($msg)
82+
new AfterProcessingMessageEvent($this, $msg)
8383
);
8484
$this->logger->debug('Queue message processed', array(
8585
'amqp' => array(

Tests/Event/AfterProcessingMessageEventTest.php

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace OldSound\RabbitMqBundle\Tests\Event;
44

55
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6+
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
67
use PhpAmqpLib\Message\AMQPMessage;
78

89
/**
@@ -12,11 +13,24 @@
1213
*/
1314
class AfterProcessingMessageEventTest extends \PHPUnit_Framework_TestCase
1415
{
15-
16+
protected function getConsumer()
17+
{
18+
return new Consumer(
19+
$this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection')
20+
->disableOriginalConstructor()
21+
->getMock(),
22+
$this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
23+
->disableOriginalConstructor()
24+
->getMock()
25+
);
26+
}
27+
1628
public function testEvent()
1729
{
1830
$AMQPMessage = new AMQPMessage('body');
19-
$event = new AfterProcessingMessageEvent($AMQPMessage);
31+
$consumer = $this->getConsumer();
32+
$event = new AfterProcessingMessageEvent($consumer, $AMQPMessage);
2033
$this->assertSame($AMQPMessage, $event->getAMQPMessage());
34+
$this->assertSame($consumer, $event->getConsumer());
2135
}
2236
}

Tests/Event/BeforeProcessingMessageEventTest.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,24 @@
1212
*/
1313
class BeforeProcessingMessageEventTest extends \PHPUnit_Framework_TestCase
1414
{
15+
protected function getConsumer()
16+
{
17+
return new Consumer(
18+
$this->getMockBuilder('\PhpAmqpLib\Connection\AMQPConnection')
19+
->disableOriginalConstructor()
20+
->getMock(),
21+
$this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
22+
->disableOriginalConstructor()
23+
->getMock()
24+
);
25+
}
26+
1527
public function testEvent()
1628
{
1729
$AMQPMessage = new AMQPMessage('body');
18-
$event = new BeforeProcessingMessageEvent($AMQPMessage);
30+
$consumer = $this->getConsumer();
31+
$event = new BeforeProcessingMessageEvent($consumer, $AMQPMessage);
1932
$this->assertSame($AMQPMessage, $event->getAMQPMessage());
33+
$this->assertSame($consumer, $event->getConsumer());
2034
}
2135
}

Tests/RabbitMq/ConsumerTest.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
66
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7+
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
78
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
89
use PhpAmqpLib\Message\AMQPMessage;
910
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
@@ -67,8 +68,8 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque
6768
$eventDispatcher->expects($this->atLeastOnce())
6869
->method('dispatch')
6970
->withConsecutive(
70-
array(BeforeProcessingMessageEvent::NAME, new BeforeProcessingMessageEvent($amqpMessage)),
71-
array(AfterProcessingMessageEvent::NAME, new AfterProcessingMessageEvent($amqpMessage))
71+
array(BeforeProcessingMessageEvent::NAME, new BeforeProcessingMessageEvent($consumer, $amqpMessage)),
72+
array(AfterProcessingMessageEvent::NAME, new AfterProcessingMessageEvent($consumer, $amqpMessage))
7273
)
7374
->willReturn(true);
7475
$consumer->processMessage($amqpMessage);
@@ -163,7 +164,8 @@ function () use ($amqpChannel) {
163164

164165
$eventDispatcher->expects($this->exactly(count($consumerCallBacks)))
165166
->method('dispatch')
166-
->willReturn($eventDispatcher);
167+
->with(OnConsumeEvent::NAME, $this->isInstanceOf('OldSound\RabbitMqBundle\Event\OnConsumeEvent'))
168+
->willReturn(true);
167169

168170
$consumer->setEventDispatcher($eventDispatcher);
169171
$consumer->consume(1);

0 commit comments

Comments
 (0)