Skip to content

Commit 7e5c85e

Browse files
committed
add possibility to use multiple hosts
1 parent ece95b9 commit 7e5c85e

File tree

8 files changed

+482
-111
lines changed

8 files changed

+482
-111
lines changed

DependencyInjection/Configuration.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,22 @@ protected function addConnections(ArrayNodeDefinition $node)
7272
->scalarNode('user')->defaultValue('guest')->end()
7373
->scalarNode('password')->defaultValue('guest')->end()
7474
->scalarNode('vhost')->defaultValue('/')->end()
75+
->arrayNode('hosts')
76+
->info('connection_timeout, read_write_timeout, use_socket, ssl_context, keepalive,
77+
heartbeat and connection_parameters_provider should be specified globally when
78+
you are using multiple hosts')
79+
->canBeUnset()
80+
->prototype('array')
81+
->children()
82+
->scalarNode('url')->defaultValue('')->end()
83+
->scalarNode('host')->defaultValue('localhost')->end()
84+
->scalarNode('port')->defaultValue(5672)->end()
85+
->scalarNode('user')->defaultValue('guest')->end()
86+
->scalarNode('password')->defaultValue('guest')->end()
87+
->scalarNode('vhost')->defaultValue('/')->end()
88+
->end()
89+
->end()
90+
->end()
7591
->booleanNode('lazy')->defaultFalse()->end()
7692
->scalarNode('connection_timeout')->defaultValue(3)->end()
7793
->scalarNode('read_write_timeout')->defaultValue(3)->end()

RabbitMq/AMQPConnectionFactory.php

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
66
use PhpAmqpLib\Connection\AbstractConnection;
7+
use PhpAmqpLib\Connection\AMQPSocketConnection;
78
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
89

910
class AMQPConnectionFactory
@@ -24,6 +25,7 @@ class AMQPConnectionFactory
2425
'ssl_context' => null,
2526
'keepalive' => false,
2627
'heartbeat' => 0,
28+
'hosts' => []
2729
);
2830

2931
/**
@@ -43,6 +45,15 @@ public function __construct(
4345
$this->class = $class;
4446
$this->parameters = array_merge($this->parameters, $parameters);
4547
$this->parameters = $this->parseUrl($this->parameters);
48+
49+
foreach ($this->parameters['hosts'] as $key => $hostParameters) {
50+
if (!isset($hostParameters['url'])) {
51+
continue;
52+
}
53+
54+
$this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
55+
}
56+
4657
if (is_array($this->parameters['ssl_context'])) {
4758
$this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context'])
4859
? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
@@ -57,50 +68,26 @@ public function __construct(
5768
* Creates the appropriate connection using current parameters.
5869
*
5970
* @return AbstractConnection
71+
* @throws \Exception
6072
*/
6173
public function createConnection()
6274
{
63-
$ref = new \ReflectionClass($this->class);
64-
6575
if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
66-
return $ref->newInstanceArgs($this->parameters['constructor_args']);
76+
$constructorArgs = array_values($this->parameters['constructor_args']);
77+
return new $this->class(...$constructorArgs);
6778
}
6879

69-
if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) {
70-
return $ref->newInstanceArgs([
71-
$this->parameters['host'],
72-
$this->parameters['port'],
73-
$this->parameters['user'],
74-
$this->parameters['password'],
75-
$this->parameters['vhost'],
76-
false, // insist
77-
'AMQPLAIN', // login_method
78-
null, // login_response
79-
'en_US', // locale
80-
isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
81-
$this->parameters['keepalive'],
82-
isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
83-
$this->parameters['heartbeat']
84-
]
85-
);
86-
} else {
87-
return $ref->newInstanceArgs([
88-
$this->parameters['host'],
89-
$this->parameters['port'],
90-
$this->parameters['user'],
91-
$this->parameters['password'],
92-
$this->parameters['vhost'],
93-
false, // insist
94-
'AMQPLAIN', // login_method
95-
null, // login_response
96-
'en_US', // locale
97-
$this->parameters['connection_timeout'],
98-
$this->parameters['read_write_timeout'],
99-
$this->parameters['ssl_context'],
100-
$this->parameters['keepalive'],
101-
$this->parameters['heartbeat']
102-
]);
80+
$hosts = $this->parameters['hosts'] ?: [$this->parameters];
81+
$options = $this->parameters;
82+
unset($options['hosts']);
83+
84+
if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
85+
$options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout'];
86+
$options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout'];
10387
}
88+
89+
// No need to unpack options, they will be handled inside connection classes
90+
return $this->class::create_connection($hosts, $options);
10491
}
10592

10693
/**

Tests/DependencyInjection/Fixtures/test.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ old_sound_rabbit_mq:
4545
lazy: true
4646
use_socket: true
4747

48+
cluster_connection:
49+
hosts:
50+
- host: cluster_host
51+
port: 111
52+
user: cluster_user
53+
password: cluster_password
54+
vhost: /cluster
55+
- url: amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost
56+
4857
default:
4958
default2:
5059
foo_default:

Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public function testFooConnectionDefinition()
3636
'heartbeat' => 0,
3737
'use_socket' => false,
3838
'url' => '',
39+
'hosts' => [],
3940
), $factory->getArgument(1));
4041
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
4142
}
@@ -65,6 +66,7 @@ public function testSslConnectionDefinition()
6566
'heartbeat' => 0,
6667
'use_socket' => false,
6768
'url' => '',
69+
'hosts' => [],
6870
), $factory->getArgument(1));
6971
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
7072
}
@@ -92,6 +94,7 @@ public function testLazyConnectionDefinition()
9294
'heartbeat' => 0,
9395
'use_socket' => false,
9496
'url' => '',
97+
'hosts' => [],
9598
), $factory->getArgument(1));
9699
$this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass());
97100
}
@@ -119,6 +122,7 @@ public function testDefaultConnectionDefinition()
119122
'heartbeat' => 0,
120123
'use_socket' => false,
121124
'url' => '',
125+
'hosts' => [],
122126
), $factory->getArgument(1));
123127
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
124128
}
@@ -141,6 +145,51 @@ public function testLazySocketConnectionDefinition()
141145
$this->assertEquals('%old_sound_rabbit_mq.lazy.socket_connection.class%', $definiton->getClass());
142146
}
143147

148+
public function testClusterConnectionDefinition()
149+
{
150+
$container = $this->getContainer('test.yml');
151+
152+
$this->assertTrue($container->has('old_sound_rabbit_mq.connection.cluster_connection'));
153+
$definition = $container->getDefinition('old_sound_rabbit_mq.connection.cluster_connection');
154+
$this->assertTrue($container->has('old_sound_rabbit_mq.connection_factory.cluster_connection'));
155+
$factory = $container->getDefinition('old_sound_rabbit_mq.connection_factory.cluster_connection');
156+
$this->assertEquals(['old_sound_rabbit_mq.connection_factory.cluster_connection', 'createConnection'], $definition->getFactory());
157+
$this->assertEquals([
158+
'hosts' => [
159+
[
160+
'host' => 'cluster_host',
161+
'port' => 111,
162+
'user' => 'cluster_user',
163+
'password' => 'cluster_password',
164+
'vhost' => '/cluster',
165+
'url' => ''
166+
],
167+
[
168+
'host' => 'localhost',
169+
'port' => 5672,
170+
'user' => 'guest',
171+
'password' => 'guest',
172+
'vhost' => '/',
173+
'url' => 'amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost'
174+
]
175+
],
176+
'host' => 'localhost',
177+
'port' => 5672,
178+
'user' => 'guest',
179+
'password' => 'guest',
180+
'vhost' => '/',
181+
'lazy' => false,
182+
'connection_timeout' => 3,
183+
'read_write_timeout' => 3,
184+
'ssl_context' => array(),
185+
'keepalive' => false,
186+
'heartbeat' => 0,
187+
'use_socket' => false,
188+
'url' => '',
189+
], $factory->getArgument(1));
190+
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
191+
}
192+
144193
public function testFooBinding()
145194
{
146195
$container = $this->getContainer('test.yml');

0 commit comments

Comments
 (0)