Skip to content

Cluster usage possibility #637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
- 2021-05-15
* Add possibility to use multiple RabbitMQ hosts

- 2017-01-22
* Add `graceful_max_execution_timeout`

Expand Down
16 changes: 16 additions & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ protected function addConnections(ArrayNodeDefinition $node)
->scalarNode('user')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->arrayNode('hosts')
->info('connection_timeout, read_write_timeout, use_socket, ssl_context, keepalive,
heartbeat and connection_parameters_provider should be specified globally when
you are using multiple hosts')
->canBeUnset()
->prototype('array')
->children()
->scalarNode('url')->defaultValue('')->end()
->scalarNode('host')->defaultValue('localhost')->end()
->scalarNode('port')->defaultValue(5672)->end()
->scalarNode('user')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->end()
->end()
->end()
->booleanNode('lazy')->defaultFalse()->end()
->scalarNode('connection_timeout')->defaultValue(3)->end()
->scalarNode('read_write_timeout')->defaultValue(3)->end()
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat so your
Please bear in mind, that you can expect problems, if your tasks are generally running longer than the heartbeat period, to which there are no good solutions ([link](https://github.com/php-amqplib/RabbitMqBundle/issues/301)).
Consider using either a big value for the heartbeat or leave the heartbeat disabled in favour of the tcp's `keepalive` (both on the client and server side) and the `graceful_max_execution_timeout` feature.

### Multiple Hosts ###

You can provide multiple hosts for a connection. This will allow you to use RabbitMQ cluster with multiple nodes.

```yaml
old_sound_rabbit_mq:
connections:
default:
hosts:
- host: host1
port: 3672
user: user1
password: password1
vhost: vhost1
- url: 'amqp://guest:password@localhost:5672/vhost'
connection_timeout: 3
read_write_timeout: 3
```

Pay attention that you can not specify
```yaml
connection_timeout
read_write_timeout
use_socket
ssl_context
keepalive
heartbeat
connection_parameters_provider
```
parameters to each host separately.

### Dynamic Connection Parameters ###

Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or
Expand Down
61 changes: 24 additions & 37 deletions RabbitMq/AMQPConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPSocketConnection;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;

class AMQPConnectionFactory
Expand All @@ -24,6 +25,7 @@ class AMQPConnectionFactory
'ssl_context' => null,
'keepalive' => false,
'heartbeat' => 0,
'hosts' => []
);

/**
Expand All @@ -43,6 +45,15 @@ public function __construct(
$this->class = $class;
$this->parameters = array_merge($this->parameters, $parameters);
$this->parameters = $this->parseUrl($this->parameters);

foreach ($this->parameters['hosts'] as $key => $hostParameters) {
if (!isset($hostParameters['url'])) {
continue;
}

$this->parameters['hosts'][$key] = $this->parseUrl($hostParameters);
}

if (is_array($this->parameters['ssl_context'])) {
$this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context'])
? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
Expand All @@ -57,50 +68,26 @@ public function __construct(
* Creates the appropriate connection using current parameters.
*
* @return AbstractConnection
* @throws \Exception
*/
public function createConnection()
{
$ref = new \ReflectionClass($this->class);

if (isset($this->parameters['constructor_args']) && is_array($this->parameters['constructor_args'])) {
return $ref->newInstanceArgs($this->parameters['constructor_args']);
$constructorArgs = array_values($this->parameters['constructor_args']);
return new $this->class(...$constructorArgs);
}

if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) {
return $ref->newInstanceArgs([
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['keepalive'],
isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['heartbeat']
]
);
} else {
return $ref->newInstanceArgs([
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
$this->parameters['connection_timeout'],
$this->parameters['read_write_timeout'],
$this->parameters['ssl_context'],
$this->parameters['keepalive'],
$this->parameters['heartbeat']
]);
$hosts = $this->parameters['hosts'] ?: [$this->parameters];
$options = $this->parameters;
unset($options['hosts']);

if ($this->class == AMQPSocketConnection::class || is_subclass_of($this->class, AMQPSocketConnection::class)) {
$options['read_timeout'] = $options['read_timeout'] ?? $this->parameters['read_write_timeout'];
$options['write_timeout'] = $options['write_timeout'] ?? $this->parameters['read_write_timeout'];
}

// No need to unpack options, they will be handled inside connection classes
return $this->class::create_connection($hosts, $options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please bear in mind that there is no smart logic and it does not spread load accross cluster. All requests will come to first defined host unless it's unrecheable(dead).

}

/**
Expand Down
9 changes: 9 additions & 0 deletions Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ old_sound_rabbit_mq:
lazy: true
use_socket: true

cluster_connection:
hosts:
- host: cluster_host
port: 111
user: cluster_user
password: cluster_password
vhost: /cluster
- url: amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost

default:
default2:
foo_default:
Expand Down
49 changes: 49 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public function testFooConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -65,6 +66,7 @@ public function testSslConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -92,6 +94,7 @@ public function testLazyConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -119,6 +122,7 @@ public function testDefaultConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => [],
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand All @@ -141,6 +145,51 @@ public function testLazySocketConnectionDefinition()
$this->assertEquals('%old_sound_rabbit_mq.lazy.socket_connection.class%', $definiton->getClass());
}

public function testClusterConnectionDefinition()
{
$container = $this->getContainer('test.yml');

$this->assertTrue($container->has('old_sound_rabbit_mq.connection.cluster_connection'));
$definition = $container->getDefinition('old_sound_rabbit_mq.connection.cluster_connection');
$this->assertTrue($container->has('old_sound_rabbit_mq.connection_factory.cluster_connection'));
$factory = $container->getDefinition('old_sound_rabbit_mq.connection_factory.cluster_connection');
$this->assertEquals(['old_sound_rabbit_mq.connection_factory.cluster_connection', 'createConnection'], $definition->getFactory());
$this->assertEquals([
'hosts' => [
[
'host' => 'cluster_host',
'port' => 111,
'user' => 'cluster_user',
'password' => 'cluster_password',
'vhost' => '/cluster',
'url' => ''
],
[
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'url' => 'amqp://cluster_url_host:cluster_url_pass@host:10000/cluster_url_vhost'
]
],
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'lazy' => false,
'connection_timeout' => 3,
'read_write_timeout' => 3,
'ssl_context' => array(),
'keepalive' => false,
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
], $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}

public function testFooBinding()
{
$container = $this->getContainer('test.yml');
Expand Down
Loading