Skip to content

Commit d58390d

Browse files
committed
add shutdown method. When called kernel shutdown, we must close opened connection
1 parent 5818690 commit d58390d

File tree

2 files changed

+95
-30
lines changed

2 files changed

+95
-30
lines changed

OldSoundRabbitMqBundle.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,17 @@ public function build(ContainerBuilder $container)
1616
$container->addCompilerPass(new RegisterPartsPass());
1717
$container->addCompilerPass(new InjectEventDispatcherPass());
1818
}
19+
20+
/**
21+
* {@inheritDoc}
22+
*/
23+
public function shutdown()
24+
{
25+
parent::shutdown();
26+
$partHolder = $this->container->get('old_sound_rabbit_mq.parts_holder');
27+
$connections = $partHolder->getParts("old_sound_rabbit_mq.base_amqp");
28+
foreach ($connections as $connection) {
29+
$connection->close();
30+
}
31+
}
1932
}

RabbitMq/BaseAmqp.php

Lines changed: 82 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
<?php
22

33
namespace OldSound\RabbitMqBundle\RabbitMq;
4+
<<<<<<< 5818690e0890fb32c1d873645f59b33c73e3598c
45
use OldSound\RabbitMqBundle\Event\AMQPEvent;
6+
=======
7+
8+
>>>>>>> add shutdown method. When called kernel shutdown, we must close opened connection
59
use PhpAmqpLib\Channel\AMQPChannel;
610
use PhpAmqpLib\Connection\AbstractConnection;
711
use PhpAmqpLib\Connection\AMQPLazyConnection;
@@ -19,35 +23,35 @@ abstract class BaseAmqp
1923
protected $queueDeclared = false;
2024
protected $routingKey = '';
2125
protected $autoSetupFabric = true;
22-
protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2);
26+
protected $basicProperties = ['content_type' => 'text/plain', 'delivery_mode' => 2];
2327

2428
/**
2529
* @var LoggerInterface
2630
*/
2731
protected $logger;
2832

29-
protected $exchangeOptions = array(
30-
'passive' => false,
31-
'durable' => true,
33+
protected $exchangeOptions = [
34+
'passive' => false,
35+
'durable' => true,
3236
'auto_delete' => false,
33-
'internal' => false,
34-
'nowait' => false,
35-
'arguments' => null,
36-
'ticket' => null,
37-
'declare' => true,
38-
);
39-
40-
protected $queueOptions = array(
41-
'name' => '',
42-
'passive' => false,
43-
'durable' => true,
44-
'exclusive' => false,
37+
'internal' => false,
38+
'nowait' => false,
39+
'arguments' => null,
40+
'ticket' => null,
41+
'declare' => true,
42+
];
43+
44+
protected $queueOptions = [
45+
'name' => '',
46+
'passive' => false,
47+
'durable' => true,
48+
'exclusive' => false,
4549
'auto_delete' => false,
46-
'nowait' => false,
47-
'arguments' => null,
48-
'ticket' => null,
49-
'declare' => true,
50-
);
50+
'nowait' => false,
51+
'arguments' => null,
52+
'ticket' => null,
53+
'declare' => true,
54+
];
5155

5256
/**
5357
* @var EventDispatcherInterface
@@ -68,12 +72,21 @@ public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $c
6872
$this->getChannel();
6973
}
7074

71-
$this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag;
75+
$this->consumerTag = empty($consumerTag) ? sprintf(
76+
"PHPPROCESS_%s_%s",
77+
gethostname(),
78+
getmypid()
79+
) : $consumerTag;
7280

7381
$this->logger = new NullLogger();
7482
}
7583

7684
public function __destruct()
85+
{
86+
$this->close();
87+
}
88+
89+
public function close()
7790
{
7891
if ($this->ch) {
7992
try {
@@ -115,6 +128,7 @@ public function getChannel()
115128

116129
/**
117130
* @param AMQPChannel $ch
131+
*
118132
* @return void
119133
*/
120134
public function setChannel(AMQPChannel $ch)
@@ -124,10 +138,12 @@ public function setChannel(AMQPChannel $ch)
124138

125139
/**
126140
* @throws \InvalidArgumentException
127-
* @param array $options
141+
*
142+
* @param array $options
143+
*
128144
* @return void
129145
*/
130-
public function setExchangeOptions(array $options = array())
146+
public function setExchangeOptions(array $options = [])
131147
{
132148
if (!isset($options['name'])) {
133149
throw new \InvalidArgumentException('You must provide an exchange name');
@@ -142,22 +158,51 @@ public function setExchangeOptions(array $options = array())
142158

143159
/**
144160
* @param array $options
161+
*
145162
* @return void
146163
*/
147-
public function setQueueOptions(array $options = array())
164+
public function setQueueOptions(array $options = [])
148165
{
149166
$this->queueOptions = array_merge($this->queueOptions, $options);
150167
}
151168

152169
/**
153170
* @param string $routingKey
171+
*
154172
* @return void
155173
*/
156174
public function setRoutingKey($routingKey)
157175
{
158176
$this->routingKey = $routingKey;
159177
}
160178

179+
public function setupFabric()
180+
{
181+
if (!$this->exchangeDeclared) {
182+
$this->exchangeDeclare();
183+
}
184+
185+
if (!$this->queueDeclared) {
186+
$this->queueDeclare();
187+
}
188+
}
189+
190+
/**
191+
* disables the automatic SetupFabric when using a consumer or producer
192+
*/
193+
public function disableAutoSetupFabric()
194+
{
195+
$this->autoSetupFabric = false;
196+
}
197+
198+
/**
199+
* @param LoggerInterface $logger
200+
*/
201+
public function setLogger($logger)
202+
{
203+
$this->logger = $logger;
204+
}
205+
161206
/**
162207
* Declares exchange
163208
*/
@@ -173,7 +218,8 @@ protected function exchangeDeclare()
173218
$this->exchangeOptions['internal'],
174219
$this->exchangeOptions['nowait'],
175220
$this->exchangeOptions['arguments'],
176-
$this->exchangeOptions['ticket']);
221+
$this->exchangeOptions['ticket']
222+
);
177223

178224
$this->exchangeDeclared = true;
179225
}
@@ -185,10 +231,16 @@ protected function exchangeDeclare()
185231
protected function queueDeclare()
186232
{
187233
if ($this->queueOptions['declare']) {
188-
list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'],
189-
$this->queueOptions['durable'], $this->queueOptions['exclusive'],
190-
$this->queueOptions['auto_delete'], $this->queueOptions['nowait'],
191-
$this->queueOptions['arguments'], $this->queueOptions['ticket']);
234+
list($queueName, ,) = $this->getChannel()->queue_declare(
235+
$this->queueOptions['name'],
236+
$this->queueOptions['passive'],
237+
$this->queueOptions['durable'],
238+
$this->queueOptions['exclusive'],
239+
$this->queueOptions['auto_delete'],
240+
$this->queueOptions['nowait'],
241+
$this->queueOptions['arguments'],
242+
$this->queueOptions['ticket']
243+
);
192244

193245
if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) {
194246
foreach ($this->queueOptions['routing_keys'] as $routingKey) {

0 commit comments

Comments
 (0)