Skip to content

Commit 2949e9c

Browse files
committed
feature symfony#46229 [Messenger] Make Redis messages countable (Jean-Beru)
This PR was squashed before being merged into the 6.2 branch. Discussion ---------- [Messenger] Make Redis messages countable | Q | A | ------------- | --- | Branch? | 6.1 | Bug fix? | no | New feature? | no | Deprecations? | no | Tickets | | License | MIT | Doc PR | Each Messenger Transport can return the number of messages present in the queue. For example, when we use the `messenger:failed:retry` command, Messenger displays the number of messages to retry. Actually, this count is not displayed when using `RedisTransport`. This PR adds this functionality. Commits ------- e1a896f [Messenger] Make Redis messages countable
2 parents 86f78aa + e1a896f commit 2949e9c

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,27 @@ public function testGetAfterReject()
339339
$redis->del('messenger-rejectthenget');
340340
}
341341

342+
public function testItCountMessages()
343+
{
344+
$this->assertSame(0, $this->connection->getMessageCount());
345+
346+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
347+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
348+
$this->connection->add('{"message": "Hi"}', ['type' => DummyMessage::class]);
349+
350+
$this->assertSame(3, $this->connection->getMessageCount());
351+
352+
$message = $this->connection->get();
353+
$this->connection->ack($message['id']);
354+
355+
$this->assertSame(2, $this->connection->getMessageCount());
356+
357+
$message = $this->connection->get();
358+
$this->connection->reject($message['id']);
359+
360+
$this->assertSame(1, $this->connection->getMessageCount());
361+
}
362+
342363
private function getConnectionGroup(Connection $connection): string
343364
{
344365
$property = (new \ReflectionClass(Connection::class))->getProperty('group');

src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,51 @@ public function cleanup(): void
521521
}
522522
}
523523

524+
public function getMessageCount(): int
525+
{
526+
$groups = $this->connection->xinfo('GROUPS', $this->stream) ?: [];
527+
528+
$lastDeliveredId = null;
529+
foreach ($groups as $group) {
530+
if ($group['name'] !== $this->group) {
531+
continue;
532+
}
533+
534+
// Use "lag" key provided by Redis 7.x. See https://redis.io/commands/xinfo-groups/#consumer-group-lag.
535+
if (isset($group['lag'])) {
536+
return $group['lag'];
537+
}
538+
539+
if (!isset($group['last-delivered-id'])) {
540+
return 0;
541+
}
542+
543+
$lastDeliveredId = $group['last-delivered-id'];
544+
break;
545+
}
546+
547+
if (null === $lastDeliveredId) {
548+
return 0;
549+
}
550+
551+
// Iterate through the stream. See https://redis.io/commands/xrange/#iterating-a-stream.
552+
$useExclusiveRangeInterval = version_compare(phpversion('redis'), '6.2.0', '>=');
553+
$total = 0;
554+
do {
555+
if (!$range = $this->connection->xRange($this->stream, $lastDeliveredId, '+', 100)) {
556+
return $total;
557+
}
558+
559+
$total += \count($range);
560+
561+
if ($useExclusiveRangeInterval) {
562+
$lastDeliveredId = preg_replace_callback('#\d+$#', static fn(array $matches) => (int) $matches[0] + 1, array_key_last($range));
563+
} else {
564+
$lastDeliveredId = '('.array_key_last($range);
565+
}
566+
} while (true);
567+
}
568+
524569
private function rawCommand(string $command, ...$arguments): mixed
525570
{
526571
try {

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisReceiver.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -22,7 +23,7 @@
2223
* @author Alexander Schranz <[email protected]>
2324
* @author Antoine Bluchet <[email protected]>
2425
*/
25-
class RedisReceiver implements ReceiverInterface
26+
class RedisReceiver implements ReceiverInterface, MessageCountAwareInterface
2627
{
2728
private Connection $connection;
2829
private SerializerInterface $serializer;
@@ -84,6 +85,14 @@ public function reject(Envelope $envelope): void
8485
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
8586
}
8687

88+
/**
89+
* {@inheritdoc}
90+
*/
91+
public function getMessageCount(): int
92+
{
93+
return $this->connection->getMessageCount();
94+
}
95+
8796
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
8897
{
8998
/** @var RedisReceivedStamp|null $redisReceivedStamp */

src/Symfony/Component/Messenger/Bridge/Redis/Transport/RedisTransport.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Redis\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1516
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1617
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1718
use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
@@ -21,7 +22,7 @@
2122
* @author Alexander Schranz <[email protected]>
2223
* @author Antoine Bluchet <[email protected]>
2324
*/
24-
class RedisTransport implements TransportInterface, SetupableTransportInterface
25+
class RedisTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
2526
{
2627
private SerializerInterface $serializer;
2728
private Connection $connection;
@@ -74,6 +75,14 @@ public function setup(): void
7475
$this->connection->setup();
7576
}
7677

78+
/**
79+
* {@inheritdoc}
80+
*/
81+
public function getMessageCount(): int
82+
{
83+
return $this->getReceiver()->getMessageCount();
84+
}
85+
7786
private function getReceiver(): RedisReceiver
7887
{
7988
return $this->receiver ??= new RedisReceiver($this->connection, $this->serializer);

0 commit comments

Comments
 (0)