Skip to content

Commit 1d59ce9

Browse files
author
Bogdan Rancichi
committed
initial batch consumer class implementation
1 parent 2c173a7 commit 1d59ce9

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed

RabbitMq/BatchConsumer.php

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
5+
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
6+
use PhpAmqpLib\Exception\AMQPTimeoutException;
7+
8+
class BatchConsumer extends Consumer
9+
{
10+
/**
11+
* @var int
12+
*/
13+
protected $prefetchCount;
14+
15+
/**
16+
* @var int
17+
*/
18+
protected $timeoutWait;
19+
20+
/**
21+
* @inheritDoc
22+
*/
23+
public function consume($msgAmount)
24+
{
25+
$this->target = $msgAmount;
26+
27+
$this->setupConsumer();
28+
29+
$isConsuming = false;
30+
$timeoutWanted = $this->getTimeoutWait();
31+
while (count($this->getChannel()->callbacks)) {
32+
$this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
33+
$this->maybeStopConsumer();
34+
if (!$this->forceStop) {
35+
try {
36+
$this->consumeMessage($timeoutWanted);
37+
$isConsuming = true;
38+
} catch (AMQPTimeoutException $e) {
39+
if ($isConsuming) {
40+
$isConsuming = false;
41+
} elseif (null !== $this->getIdleTimeoutExitCode()) {
42+
return $this->getIdleTimeoutExitCode();
43+
} else {
44+
throw $e;
45+
}
46+
}
47+
}
48+
49+
$timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout();
50+
}
51+
}
52+
53+
/**
54+
* @param int $timeout
55+
*
56+
* @return $this
57+
*/
58+
public function setTimeoutWait($timeout)
59+
{
60+
$this->timeoutWait = $timeout;
61+
62+
return $this;
63+
}
64+
65+
/**
66+
* @param int $amount
67+
*
68+
* @return $this
69+
*/
70+
public function setPrefetchCount($amount)
71+
{
72+
$this->prefetchCount = $amount;
73+
74+
return $this;
75+
}
76+
77+
/**
78+
* @return int
79+
*/
80+
public function getTimeoutWait()
81+
{
82+
return $this->timeoutWait;
83+
}
84+
85+
/**
86+
* @return int
87+
*/
88+
public function getPrefetchCount()
89+
{
90+
return $this->prefetchCount;
91+
}
92+
93+
/**
94+
* @param int $timeout
95+
*
96+
* @return void
97+
*
98+
* @throws AMQPTimeoutException
99+
*/
100+
private function consumeMessage($timeout)
101+
{
102+
$this->getChannel()->wait(null, false, $timeout);
103+
}
104+
}

0 commit comments

Comments
 (0)