Skip to content

Commit 166f18f

Browse files
committed
[amqp] introduce lazy context.
1 parent 026e496 commit 166f18f

File tree

2 files changed

+50
-13
lines changed

2 files changed

+50
-13
lines changed

pkg/amqp-ext/AmqpConnectionFactory.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public function __construct(array $config)
4343
'write_timeout' => null,
4444
'connect_timeout' => null,
4545
'persisted' => false,
46+
'lazy' => true,
4647
], $config);
4748
}
4849

@@ -52,6 +53,17 @@ public function __construct(array $config)
5253
* @return AmqpContext
5354
*/
5455
public function createContext()
56+
{
57+
if ($this->config['lazy']) {
58+
return new AmqpContext(function() {
59+
return new \AMQPChannel($this->establishConnection());
60+
});
61+
}
62+
63+
return new AmqpContext(new \AMQPChannel($this->establishConnection()));
64+
}
65+
66+
private function establishConnection()
5567
{
5668
if (false == $this->connection) {
5769
$this->connection = new \AMQPConnection($this->config);
@@ -63,6 +75,6 @@ public function createContext()
6375
$this->config['persisted'] ? $this->connection->preconnect() : $this->connection->reconnect();
6476
}
6577

66-
return new AmqpContext(new \AMQPChannel($this->connection));
78+
return $this->connection;
6779
}
6880
}

pkg/amqp-ext/AmqpContext.php

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@ class AmqpContext implements Context
1515
private $extChannel;
1616

1717
/**
18-
* @param \AMQPChannel $extChannel
18+
* @var callable
1919
*/
20-
public function __construct(\AMQPChannel $extChannel)
20+
private $extChannelFactory;
21+
22+
/**
23+
* Callable must return instance of \AMQPChannel once called
24+
*
25+
* @param \AMQPChannel|callable $extChannel
26+
*/
27+
public function __construct($extChannel)
2128
{
22-
$this->extChannel = $extChannel;
29+
if ($extChannel instanceof \AMQPChannel) {
30+
$this->extChannel = $extChannel;
31+
} elseif (is_callable($extChannel)) {
32+
$this->extChannelFactory = $extChannel;
33+
} else {
34+
throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.');
35+
}
2336
}
2437

2538
/**
@@ -49,7 +62,7 @@ public function deleteTopic(Destination $destination)
4962
{
5063
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class);
5164

52-
$extExchange = new \AMQPExchange($this->extChannel);
65+
$extExchange = new \AMQPExchange($this->getExtChannel());
5366
$extExchange->delete($destination->getTopicName(), $destination->getFlags());
5467
}
5568

@@ -60,7 +73,7 @@ public function declareTopic(Destination $destination)
6073
{
6174
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class);
6275

63-
$extExchange = new \AMQPExchange($this->extChannel);
76+
$extExchange = new \AMQPExchange($this->getExtChannel());
6477
$extExchange->setName($destination->getTopicName());
6578
$extExchange->setType($destination->getType());
6679
$extExchange->setArguments($destination->getArguments());
@@ -86,7 +99,7 @@ public function deleteQueue(Destination $destination)
8699
{
87100
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);
88101

89-
$extQueue = new \AMQPQueue($this->extChannel);
102+
$extQueue = new \AMQPQueue($this->getExtChannel());
90103
$extQueue->setName($destination->getQueueName());
91104
$extQueue->delete($destination->getFlags());
92105
}
@@ -98,7 +111,7 @@ public function declareQueue(Destination $destination)
98111
{
99112
InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);
100113

101-
$extQueue = new \AMQPQueue($this->extChannel);
114+
$extQueue = new \AMQPQueue($this->getExtChannel());
102115
$extQueue->setFlags($destination->getFlags());
103116
$extQueue->setArguments($destination->getArguments());
104117

@@ -135,7 +148,7 @@ public function createTemporaryQueue()
135148
*/
136149
public function createProducer()
137150
{
138-
return new AmqpProducer($this->extChannel);
151+
return new AmqpProducer($this->getExtChannel());
139152
}
140153

141154
/**
@@ -164,7 +177,7 @@ public function createConsumer(Destination $destination)
164177

165178
public function close()
166179
{
167-
$extConnection = $this->extChannel->getConnection();
180+
$extConnection = $this->getExtChannel()->getConnection();
168181
if ($extConnection->isConnected()) {
169182
$extConnection->isPersistent() ? $extConnection->pdisconnect() : $extConnection->disconnect();
170183
}
@@ -179,7 +192,7 @@ public function bind(Destination $source, Destination $target)
179192
InvalidDestinationException::assertDestinationInstanceOf($source, AmqpTopic::class);
180193
InvalidDestinationException::assertDestinationInstanceOf($target, AmqpQueue::class);
181194

182-
$amqpQueue = new \AMQPQueue($this->extChannel);
195+
$amqpQueue = new \AMQPQueue($this->getExtChannel());
183196
$amqpQueue->setName($target->getQueueName());
184197
$amqpQueue->bind($source->getTopicName(), $amqpQueue->getName(), $target->getBindArguments());
185198
}
@@ -189,14 +202,26 @@ public function bind(Destination $source, Destination $target)
189202
*/
190203
public function getExtConnection()
191204
{
192-
return $this->extChannel->getConnection();
205+
return $this->getExtChannel()->getConnection();
193206
}
194207

195208
/**
196-
* @return mixed
209+
* @return \AMQPChannel
197210
*/
198211
public function getExtChannel()
199212
{
213+
if (false == $this->extChannel) {
214+
$extChannel = call_user_func($this->extChannelFactory);
215+
if (false == $extChannel instanceof \AMQPChannel) {
216+
throw new \LogicException(sprintf(
217+
'The factory must return instance of AMQPChannel. It returns %s',
218+
is_object($extChannel) ? get_class($extChannel) : gettype($extChannel)
219+
));
220+
}
221+
222+
$this->extChannel = $extChannel;
223+
}
224+
200225
return $this->extChannel;
201226
}
202227
}

0 commit comments

Comments
 (0)