Skip to content

RabbitMQ cluster support #588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,18 @@ protected function addConnections(ArrayNodeDefinition $node)
->booleanNode('keepalive')->defaultFalse()->info('requires php-amqplib v2.4.1+ and PHP5.4+')->end()
->scalarNode('heartbeat')->defaultValue(0)->info('requires php-amqplib v2.4.1+')->end()
->scalarNode('connection_parameters_provider')->end()
->arrayNode('hosts')
->canBeUnset()
->prototype('array')
->children()
->scalarNode('host')->defaultValue('localhost')->end()
->scalarNode('port')->defaultValue(5672)->end()
->scalarNode('user')->defaultValue('guest')->end()
->scalarNode('password')->defaultValue('guest')->end()
->scalarNode('vhost')->defaultValue('/')->end()
->end()
->end()
->end()
->end()
->end()
->end()
Expand Down Expand Up @@ -216,7 +228,7 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node)
->end()
;
}

protected function addDynamicConsumers(ArrayNodeDefinition $node)
{
$node
Expand Down
68 changes: 37 additions & 31 deletions RabbitMq/AMQPConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AMQPConnectionFactory
'ssl_context' => null,
'keepalive' => false,
'heartbeat' => 0,
'hosts' => null,
);

/**
Expand Down Expand Up @@ -65,40 +66,34 @@ public function createConnection()
return $ref->newInstanceArgs($this->parameters['constructor_args']);
}

if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class , 'PhpAmqpLib\Connection\AMQPSocketConnection')) {
return new $this->class(
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['keepalive'],
isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
$this->parameters['heartbeat']
$readWriteTimeout = $this->getParameter('read_write_timeout');
if (!$readWriteTimeout) {
$readWriteTimeout = $this->getParameter(
'read_timeout',
$this->getParameter('write_timeout')
);
} else {
return new $this->class(
$this->parameters['host'],
$this->parameters['port'],
$this->parameters['user'],
$this->parameters['password'],
$this->parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_US', // locale
$this->parameters['connection_timeout'],
$this->parameters['read_write_timeout'],
$this->parameters['ssl_context'],
$this->parameters['keepalive'],
$this->parameters['heartbeat']
}
$options = array(
'ssl_options' => $this->parameters['ssl_context'],
'keepalive' => $this->parameters['keepalive'],
'read_timeout' => isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'],
'write_timeout' => isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'],
'read_write_timeout' => $readWriteTimeout,
'heartbeat' => $this->parameters['heartbeat'],
);
$hosts = isset($this->parameters['hosts']) ? $this->parameters['hosts'] : null;
if (!$hosts) {
$hosts = array(
array(
'host' => $this->parameters['host'],
'port' => $this->parameters['port'],
'user' => $this->parameters['user'],
'password' => $this->parameters['password'],
'vhost' => $this->parameters['vhost'],
)
);
}
return call_user_func(array($this->class, 'create_connection'), $hosts, $options);
}

/**
Expand Down Expand Up @@ -147,4 +142,15 @@ private function parseUrl(array $parameters)

return $parameters;
}

/**
* Try to get value from parameters.
* @param $name
* @param null $default
* @return mixed|null
*/
private function getParameter($name, $default = null)
{
return isset($this->parameters[$name]) ? $this->parameters[$name] : $default;
}
}
11 changes: 10 additions & 1 deletion Tests/DependencyInjection/Fixtures/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ old_sound_rabbit_mq:
lazy: true
use_socket: true

cluster:
hosts:
-
host: foo_host
port: 123
user: foo_user
password: foo_password
vhost: /foo

default:

producers:
Expand Down Expand Up @@ -155,7 +164,7 @@ old_sound_rabbit_mq:
- 'iphone.upload'
callback: foo.multiple_test2.callback
queues_provider: foo.queues_provider

dynamic_consumers:
foo_dyn_consumer:
connection: foo_default
Expand Down
40 changes: 40 additions & 0 deletions Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public function testFooConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => array(),
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -65,6 +66,7 @@ public function testSslConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => array(),
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -92,6 +94,7 @@ public function testLazyConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => array(),
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.lazy.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -119,6 +122,7 @@ public function testDefaultConnectionDefinition()
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => array(),
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}
Expand Down Expand Up @@ -862,6 +866,42 @@ public function testProducersWithLogger()
);
}

public function testClusterConnectionDefinition()
{
$container = $this->getContainer('test.yml');

$this->assertTrue($container->has('old_sound_rabbit_mq.connection.cluster'));
$definition = $container->getDefinition('old_sound_rabbit_mq.connection.cluster');
$this->assertTrue($container->has('old_sound_rabbit_mq.connection_factory.cluster'));
$factory = $container->getDefinition('old_sound_rabbit_mq.connection_factory.cluster');
$this->assertEquals(array('old_sound_rabbit_mq.connection_factory.cluster', 'createConnection'), $definition->getFactory());
$this->assertEquals(array(
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
'lazy' => false,
'connection_timeout' => 3,
'read_write_timeout' => 3,
'ssl_context' => array(),
'keepalive' => false,
'heartbeat' => 0,
'use_socket' => false,
'url' => '',
'hosts' => array(
array(
'host' => 'foo_host',
'port' => 123,
'user' => 'foo_user',
'password' => 'foo_password',
'vhost' => '/foo',
)
),
), $factory->getArgument(1));
$this->assertEquals('%old_sound_rabbit_mq.connection.class%', $definition->getClass());
}

private function getContainer($file, $debug = false)
{
$container = new ContainerBuilder(new ParameterBag(array('kernel.debug' => $debug)));
Expand Down
24 changes: 10 additions & 14 deletions Tests/RabbitMq/AMQPConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public function testDefaultValues()
'guest', // password
'/', // vhost
false, // insist
"AMQPLAIN", // login method
'AMQPLAIN', // login method
null, // login response
"en_US", // locale
'en_US', // locale
3, // connection timeout
3, // read write timeout
null, // context
Expand Down Expand Up @@ -274,10 +274,8 @@ public function testConnectionsParametersProviderWithConstructorArgs()
$connectionParametersProvider = $this->prepareConnectionParametersProvider();
$connectionParametersProvider->expects($this->once())
->method('getConnectionParameters')
->will($this->returnValue(
array(
'constructor_args' => array(1,2,3,4)
)
->willReturn(array(
'constructor_args' => array(1, 2, 3, 4)
));
$factory = new AMQPConnectionFactory(
'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection',
Expand All @@ -296,14 +294,12 @@ public function testConnectionsParametersProvider()
$connectionParametersProvider = $this->prepareConnectionParametersProvider();
$connectionParametersProvider->expects($this->once())
->method('getConnectionParameters')
->will($this->returnValue(
array(
'host' => '1.2.3.4',
'port' => 5678,
'user' => 'admin',
'password' => 'admin',
'vhost' => 'foo',
)
->willReturn(array(
'host' => '1.2.3.4',
'port' => 5678,
'user' => 'admin',
'password' => 'admin',
'vhost' => 'foo',
));
$factory = new AMQPConnectionFactory(
'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection',
Expand Down
21 changes: 21 additions & 0 deletions Tests/RabbitMq/Fixtures/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,25 @@ public function __construct()
// save params for direct access in tests
$this->constructParams = func_get_args();
}

public static function create_connection($hosts, $options)
{
$options = array_merge($hosts[0], $options);
return new self(
$hosts[0]['host'],
$hosts[0]['port'],
$hosts[0]['user'],
$hosts[0]['password'],
$hosts[0]['vhost'],
false,
'AMQPLAIN',
null,
'en_US',
$options['read_timeout'],
$options['write_timeout'],
$options['ssl_options'],
$options['keepalive'],
$options['heartbeat']
);
}
}
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"symfony/config": "^2.7|^3.0|^4.0",
"symfony/yaml": "^2.7|^3.0|^4.0",
"symfony/console": "^2.7|^3.0|^4.0",
"php-amqplib/php-amqplib": "^2.6",
"php-amqplib/php-amqplib": "^2.9",
"psr/log": "^1.0"
},
"require-dev": {
Expand Down