Skip to content

Dynamic Connection Parameters #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected function addConnections(ArrayNodeDefinition $node)
->end()
->booleanNode('keepalive')->defaultFalse()->info('requires php-amqplib v2.4.1+ and PHP5.4+')->end()
->scalarNode('heartbeat')->defaultValue(0)->info('requires php-amqplib v2.4.1+')->end()
->scalarNode('connection_parameters_provider')->end()
->end()
->end()
->end()
Expand Down
7 changes: 5 additions & 2 deletions DependencyInjection/OldSoundRabbitMqExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Exception\InvalidArgumentException;
use Symfony\Component\DependencyInjection\Extension\Extension;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -81,6 +80,10 @@ protected function loadConnections()
$definition = new Definition('%old_sound_rabbit_mq.connection_factory.class%', array(
$classParam, $connection,
));
if (isset($connection['connection_parameters_provider'])) {
$definition->addArgument(new Reference($connection['connection_parameters_provider']));
unset($connection['connection_parameters_provider']);
}
$definition->setPublic(false);
$factoryName = sprintf('old_sound_rabbit_mq.connection_factory.%s', $key);
$this->container->setDefinition($factoryName, $definition);
Expand Down Expand Up @@ -519,7 +522,7 @@ private function injectLogger(Definition $definition)
));
$definition->addMethodCall('setLogger', array(new Reference('logger', ContainerInterface::IGNORE_ON_INVALID_REFERENCE)));
}

/**
* Get default AMQP exchange options
*
Expand Down
33 changes: 33 additions & 0 deletions Provider/ConnectionParametersProviderInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace OldSound\RabbitMqBundle\Provider;

/**
* Interface to provide and/or override connection parameters.
*
* @author David Cochrum <[email protected]>
*/
interface ConnectionParametersProviderInterface
{
/**
* Return connection parameters.
*
* Example:
* array(
* 'host' => 'localhost',
* 'port' => 5672,
* 'user' => 'guest',
* 'password' => 'guest',
* 'vhost' => '/',
* 'lazy' => false,
* 'connection_timeout' => 3,
* 'read_write_timeout' => 3,
* 'keepalive' => false,
* 'heartbeat' => 0,
* 'use_socket' => true,
* )
*
* @return array
*/
public function getConnectionParameters();
}
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,42 @@ by default to avoid possible breaks in applications already using this bundle.

It's a good idea to set the ```read_write_timeout``` to 2x the heartbeat so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the __consumer__ socket will timeout.

### Dynamic Connection Parameters ###

Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or
override parameters programmatically through a service.

e.g. In a scenario when the `vhost` parameter of the connection depends on the current tenant of your white-labeled
application and you do not want (or can't) change it's configuration every time.

Define a service under `connection_parameters_provider` that implements the `ConnectionParametersProviderInterface`,
and add it to the appropriate `connections` configuration.

```yaml
connections:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: 'foo' # to be dynamically overridden by `connection_parameters_provider`
connection_parameters_provider: connection_parameters_provider_service
```

Example Implementation:

```php
class ConnectionParametersProviderService implements ConnectionParametersProvider {
...
public function getConnectionParameters() {
return array('vhost' => $this->getVhost());
}
...
}
```

In this case, the `vhost` parameter will be overridden by the output of `getVhost()`.

## Producers, Consumers, What? ##

In a messaging application, the process sending messages to the broker is called __producer__ while the process receiving those messages is called __consumer__. In your application you will have several of them that you can list under their respective entries in the configuration.
Expand Down
33 changes: 28 additions & 5 deletions RabbitMq/AMQPConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace OldSound\RabbitMqBundle\RabbitMq;

use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;

class AMQPConnectionFactory
Expand All @@ -27,11 +29,17 @@ class AMQPConnectionFactory
/**
* Constructor
*
* @param string $class FQCN of AMQPConnection class to instantiate.
* @param array $parameters Map containing parameters resolved by Extension.
* @param string $class FQCN of AMQPConnection class to instantiate.
* @param array $parameters Map containing parameters resolved by
* Extension.
* @param ConnectionParametersProviderInterface $parametersProvider Optional service providing/overriding
* connection parameters.
*/
public function __construct($class, array $parameters)
{
public function __construct(
$class,
array $parameters,
ConnectionParametersProviderInterface $parametersProvider = null
) {
$this->class = $class;
$this->parameters = array_merge($this->parameters, $parameters);
$this->parameters = $this->parseUrl($this->parameters);
Expand All @@ -40,8 +48,16 @@ public function __construct($class, array $parameters)
? stream_context_create(array('ssl' => $this->parameters['ssl_context']))
: null;
}
if ($parametersProvider) {
$this->parameters = array_merge($this->parameters, $parametersProvider->getConnectionParameters());
}
}

/**
* Creates the appropriate connection using current parameters.
*
* @return AbstractConnection
*/
public function createConnection()
{
return new $this->class(
Expand All @@ -62,7 +78,14 @@ public function createConnection()
);
}

private function parseUrl($parameters)
/**
* Parses connection parameters from URL parameter.
*
* @param array $parameters
*
* @return array
*/
private function parseUrl(array $parameters)
{
if (!$parameters['url']) {
return $parameters;
Expand Down
53 changes: 53 additions & 0 deletions Tests/RabbitMq/AMQPConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace OldSound\RabbitMqBundle\Tests\RabbitMq;

use OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface;
use OldSound\RabbitMqBundle\RabbitMq\AMQPConnectionFactory;
use OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection;

Expand Down Expand Up @@ -148,4 +149,56 @@ public function testSSLConnectionParameters()
0, // heartbeat
), $instance->constructParams);
}

public function testConnectionsParametersProvider()
{
$connectionParametersProvider = $this->prepareConnectionParametersProvider();
$connectionParametersProvider->expects($this->once())
->method('getConnectionParameters')
->will($this->returnValue(
array(
'host' => '1.2.3.4',
'port' => 5678,
'user' => 'admin',
'password' => 'admin',
'vhost' => 'foo',
)
));
$factory = new AMQPConnectionFactory(
'OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection',
array(),
$connectionParametersProvider
);

/** @var AMQPConnection $instance */
$instance = $factory->createConnection();
$this->assertInstanceOf('OldSound\RabbitMqBundle\Tests\RabbitMq\Fixtures\AMQPConnection', $instance);
$this->assertEquals(array(
'1.2.3.4', // host
5678, // port
'admin', // user
'admin', // password
'foo', // vhost
false, // insist
"AMQPLAIN", // login method
null, // login response
"en_US", // locale
3, // connection timeout
3, // read write timeout
null, // context
false, // keepalive
0, // heartbeat
), $instance->constructParams);
}

/**
* Preparing ConnectionParametersProviderInterface instance
*
* @return \PHPUnit_Framework_MockObject_MockObject|ConnectionParametersProviderInterface
*/
private function prepareConnectionParametersProvider()
{
return $this->getMockBuilder('OldSound\RabbitMqBundle\Provider\ConnectionParametersProviderInterface')
->getMock();
}
}