Skip to content

Commit 2928f16

Browse files
feature #40155 [Messenger] Support Redis Cluster (nesk)
This PR was squashed before being merged into the 5.3-dev branch. Discussion ---------- [Messenger] Support Redis Cluster | Q | A | ------------- | --- | Branch? | 5.x | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fix #38264 | License | MIT | Doc PR | symfony/symfony-docs#14956 This PR brings support for Redis Cluster in the Messenger component: - The first commit _Support RedisCluster instance_ allows to pass a `RedisCluster` object when instanciating the `Connection` class, which brings support for Redis Cluster without any friction. - The second commit _Support multiple hosts DSN for Redis Cluster_ is more opiniated and brings a DSN format to configure a Redis Cluster from `config/packages/messenger.yaml`. Instanciating `Connection` with a `RedisCluster` object: ```php $redis = new \RedisCluster(null, ['host-01:6379', 'host-02:6379', 'host-03:6379', 'host-04:6379']); $connection = new Connection([], [], [], $redis); ``` Configuring a Redis Cluster from YAML: ```yaml // config/packages/messenger.yaml framework: messenger: metadata: default: 'redis://host-01:6379,redis://host-02:6379,redis://host-03:6379' lazy: 'redis://host-01:6379?lazy=1,redis://host-02:6379,redis://host-03:6379' # Configuration will be `lazy = true` and `auto_setup = true` multipleConfig: 'redis://host-01:6379?lazy=1&auto_setup=false,redis://host-02:6379,redis://host-03:6379?auto_setup=true' ``` This format allows to define multiple hosts for a Redis Cluster and still contains valid URLs. Custom configuration is still supported, it can be specified on only one of the URLs in the DSN (see `lazy` above). If the user provides multiple configurations on different URLs, they are simply merged with the following code and if an option is defined multiple times then the latest takes precedence (see `multipleConfig` above). I understand the way the DSN is handled could not suit you. Please, if you close this PR only for the DSN part, just tell me and I will make a new PR with only the first commit. Commits ------- 04530fb2d7 [Messenger] Support Redis Cluster
2 parents 04bb847 + f4d1ffd commit 2928f16

File tree

5 files changed

+178
-35
lines changed

5 files changed

+178
-35
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ CHANGELOG
66

77
* Add `rediss://` DSN scheme support for TLS protocol
88
* Deprecate TLS option, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
9+
* Add support for `\RedisCluster` instance in `Connection` constructor
10+
* Add support for Redis Cluster in DSN
911

1012
5.2.0
1113
-----

Tests/Transport/ConnectionTest.php

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,15 @@ public static function setUpBeforeClass(): void
4040
}
4141
}
4242

43+
private function skipIfRedisClusterUnavailable()
44+
{
45+
try {
46+
new \RedisCluster(null, explode(' ', getenv('REDIS_CLUSTER_HOSTS')));
47+
} catch (\Exception $e) {
48+
self::markTestSkipped($e->getMessage());
49+
}
50+
}
51+
4352
public function testFromInvalidDsn()
4453
{
4554
$this->expectException(\InvalidArgumentException::class);
@@ -59,6 +68,20 @@ public function testFromDsn()
5968
);
6069
}
6170

71+
public function testFromDsnWithMultipleHosts()
72+
{
73+
$this->skipIfRedisClusterUnavailable();
74+
75+
$hosts = explode(' ', getenv('REDIS_CLUSTER_HOSTS'));
76+
77+
$dsn = array_map(function ($host) {
78+
return 'redis://'.$host;
79+
}, $hosts);
80+
$dsn = implode(',', $dsn);
81+
82+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
}
84+
6285
public function testFromDsnOnUnixSocket()
6386
{
6487
$this->assertEquals(
@@ -160,6 +183,14 @@ public function testDeprecationIfInvalidOptionIsPassedWithDsn()
160183
Connection::fromDsn('redis://localhost/queue?foo=bar');
161184
}
162185

186+
public function testRedisClusterInstanceIsSupported()
187+
{
188+
$this->skipIfRedisClusterUnavailable();
189+
190+
$redis = new \RedisCluster(null, explode(' ', getenv('REDIS_CLUSTER_HOSTS')));
191+
$this->assertInstanceOf(Connection::class, new Connection([], [], [], $redis));
192+
}
193+
163194
public function testKeepGettingPendingMessages()
164195
{
165196
$redis = $this->createMock(\Redis::class);
@@ -429,4 +460,20 @@ public function testLazy()
429460
$connection->reject($message['id']);
430461
$redis->del('messenger-lazy');
431462
}
463+
464+
public function testLazyCluster()
465+
{
466+
$this->skipIfRedisClusterUnavailable();
467+
468+
$connection = new Connection(
469+
['lazy' => true],
470+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
471+
);
472+
473+
$connection->add('1', []);
474+
$this->assertNotEmpty($message = $connection->get());
475+
$this->assertSame('1', $message['body']);
476+
$connection->reject($message['id']);
477+
$connection->cleanup();
478+
}
432479
}

Transport/Connection.php

Lines changed: 83 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ class Connection
5656
private $deleteAfterReject;
5757
private $couldHavePendingMessages = true;
5858

59-
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
59+
/**
60+
* @param \Redis|\RedisCluster|null $redis
61+
*/
62+
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], $redis = null)
6063
{
6164
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
6265
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
@@ -71,29 +74,19 @@ public function __construct(array $configuration, array $connectionCredentials =
7174
$auth = null;
7275
}
7376

74-
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
75-
$redis->connect($host, $port);
76-
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
77-
78-
if (null !== $auth && !$redis->auth($auth)) {
79-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
80-
}
81-
82-
if ($dbIndex && !$redis->select($dbIndex)) {
83-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
84-
}
85-
86-
return true;
87-
};
88-
89-
if (null === $redis) {
90-
$redis = new \Redis();
91-
}
92-
93-
if ($configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy']) {
94-
$redis = new RedisProxy($redis, $initializer);
77+
$lazy = $configuration['lazy'] ?? self::DEFAULT_OPTIONS['lazy'];
78+
if (\is_array($host) || $redis instanceof \RedisCluster) {
79+
$hosts = \is_string($host) ? [$host.':'.$port] : $host; // Always ensure we have an array
80+
$initializer = static function ($redis) use ($hosts, $auth, $serializer) {
81+
return self::initializeRedisCluster($redis, $hosts, $auth, $serializer);
82+
};
83+
$redis = $lazy ? new RedisClusterProxy($redis, $initializer) : $initializer($redis);
9584
} else {
96-
$initializer($redis);
85+
$redis = $redis ?? new \Redis();
86+
$initializer = static function ($redis) use ($host, $port, $auth, $serializer, $dbIndex) {
87+
return self::initializeRedis($redis, $host, $port, $auth, $serializer, $dbIndex);
88+
};
89+
$redis = $lazy ? new RedisProxy($redis, $initializer) : $initializer($redis);
9790
}
9891

9992
$this->connection = $redis;
@@ -116,21 +109,57 @@ public function __construct(array $configuration, array $connectionCredentials =
116109
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
117110
}
118111

119-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
112+
private static function initializeRedis(\Redis $redis, string $host, int $port, ?string $auth, int $serializer, int $dbIndex): \Redis
120113
{
121-
$url = $dsn;
122-
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
114+
$redis->connect($host, $port);
115+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
123116

124-
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
125-
$url = str_replace($scheme.':', 'file:', $dsn);
117+
if (null !== $auth && !$redis->auth($auth)) {
118+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
126119
}
127120

128-
if (false === $parsedUrl = parse_url($url)) {
129-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
121+
if ($dbIndex && !$redis->select($dbIndex)) {
122+
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
130123
}
131-
if (isset($parsedUrl['query'])) {
132-
parse_str($parsedUrl['query'], $dsnOptions);
133-
$redisOptions = array_merge($redisOptions, $dsnOptions);
124+
125+
return $redis;
126+
}
127+
128+
private static function initializeRedisCluster(?\RedisCluster $redis, array $hosts, ?string $auth, int $serializer): \RedisCluster
129+
{
130+
if (null === $redis) {
131+
$redis = new \RedisCluster(null, $hosts, 0.0, 0.0, false, $auth);
132+
}
133+
134+
$redis->setOption(\Redis::OPT_SERIALIZER, $serializer);
135+
136+
return $redis;
137+
}
138+
139+
/**
140+
* @param \Redis|\RedisCluster|null $redis
141+
*/
142+
public static function fromDsn(string $dsn, array $redisOptions = [], $redis = null): self
143+
{
144+
if (false === strpos($dsn, ',')) {
145+
$parsedUrl = self::parseDsn($dsn, $redisOptions);
146+
} else {
147+
$dsns = explode(',', $dsn);
148+
$parsedUrls = array_map(function ($dsn) use (&$redisOptions) {
149+
return self::parseDsn($dsn, $redisOptions);
150+
}, $dsns);
151+
152+
// Merge all the URLs, the last one overrides the previous ones
153+
$parsedUrl = array_merge(...$parsedUrls);
154+
155+
// Regroup all the hosts in an array interpretable by RedisCluster
156+
$parsedUrl['host'] = array_map(function ($parsedUrl, $dsn) {
157+
if (!isset($parsedUrl['host'])) {
158+
throw new InvalidArgumentException(sprintf('Missing host in DSN part "%s", it must be defined when using Redis Cluster.', $dsn));
159+
}
160+
161+
return $parsedUrl['host'].':'.($parsedUrl['port'] ?? 6379);
162+
}, $parsedUrls, $dsns);
134163
}
135164

136165
self::validateOptions($redisOptions);
@@ -165,7 +194,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
165194
unset($redisOptions['dbindex']);
166195
}
167196

168-
$tls = 'rediss' === $scheme;
197+
$tls = 'rediss' === $parsedUrl['scheme'];
169198
if (\array_key_exists('tls', $redisOptions)) {
170199
trigger_deprecation('symfony/redis-messenger', '5.3', 'Providing "tls" parameter is deprecated, use "rediss://" DSN scheme instead');
171200
$tls = filter_var($redisOptions['tls'], \FILTER_VALIDATE_BOOLEAN);
@@ -223,6 +252,26 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
223252
return new self($configuration, $connectionCredentials, $redisOptions, $redis);
224253
}
225254

255+
private static function parseDsn(string $dsn, array &$redisOptions): array
256+
{
257+
$url = $dsn;
258+
$scheme = 0 === strpos($dsn, 'rediss:') ? 'rediss' : 'redis';
259+
260+
if (preg_match('#^'.$scheme.':///([^:@])+$#', $dsn)) {
261+
$url = str_replace($scheme.':', 'file:', $dsn);
262+
}
263+
264+
if (false === $parsedUrl = parse_url($url)) {
265+
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
266+
}
267+
if (isset($parsedUrl['query'])) {
268+
parse_str($parsedUrl['query'], $dsnOptions);
269+
$redisOptions = array_merge($redisOptions, $dsnOptions);
270+
}
271+
272+
return $parsedUrl;
273+
}
274+
226275
private static function validateOptions(array $options): void
227276
{
228277
$availableOptions = array_keys(self::DEFAULT_OPTIONS);

Transport/RedisClusterProxy.php

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
13+
14+
/**
15+
* Allow to delay connection to Redis Cluster.
16+
*
17+
* @author Johann Pardanaud <[email protected]>
18+
*
19+
* @internal
20+
*/
21+
class RedisClusterProxy
22+
{
23+
private $redis;
24+
private $initializer;
25+
private $ready = false;
26+
27+
public function __construct(?\RedisCluster $redis, \Closure $initializer)
28+
{
29+
$this->redis = $redis;
30+
$this->initializer = $initializer;
31+
}
32+
33+
public function __call(string $method, array $args)
34+
{
35+
if (!$this->ready) {
36+
$this->redis = $this->initializer->__invoke($this->redis);
37+
$this->ready = true;
38+
}
39+
40+
return $this->redis->{$method}(...$args);
41+
}
42+
}

Transport/RedisProxy.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ public function __construct(\Redis $redis, \Closure $initializer)
3333

3434
public function __call(string $method, array $args)
3535
{
36-
$this->ready ?: $this->ready = $this->initializer->__invoke($this->redis);
36+
if (!$this->ready) {
37+
$this->redis = $this->initializer->__invoke($this->redis);
38+
$this->ready = true;
39+
}
3740

3841
return $this->redis->{$method}(...$args);
3942
}

0 commit comments

Comments
 (0)