Skip to content

Commit 4100e60

Browse files
committed
feature #28190 [Messenger] Add a --bus option to the messenger:consume-messages command (chalasr, sroze)
This PR was merged into the 4.2-dev branch. Discussion ---------- [Messenger] Add a --bus option to the messenger:consume-messages command | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | n/a | License | MIT | Doc PR | todo Making it compatible with the multi-bus feature. Commits ------- e3f1eecbc1 Bus argument is a required option when multiple buses are defined 539cb62ffe [Messenger] Add a --bus option to the messenger:consume-messages command
2 parents d7e6edf + bf24784 commit 4100e60

File tree

5 files changed

+58
-19
lines changed

5 files changed

+58
-19
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,5 @@ CHANGELOG
66

77
* `ValidationMiddleware::handle()` and `SendMessageMiddleware::handle()` now require an `Envelope` object
88
* `EnvelopeItemInterface` doesn't extend `Serializable` anymore
9+
* [BC BREAK] The `ConsumeMessagesCommand` class now takes an instance of `Psr\Container\ContainerInterface`
10+
as first constructor argument

Command/ConsumeMessagesCommand.php

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23-
use Symfony\Component\Messenger\MessageBusInterface;
2423
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMemoryUsageIsExceededReceiver;
2524
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenMessageCountIsExceededReceiver;
2625
use Symfony\Component\Messenger\Transport\Enhancers\StopWhenTimeLimitIsReachedReceiver;
@@ -35,17 +34,19 @@ class ConsumeMessagesCommand extends Command
3534
{
3635
protected static $defaultName = 'messenger:consume-messages';
3736

38-
private $bus;
37+
private $busLocator;
3938
private $receiverLocator;
4039
private $logger;
4140
private $receiverNames;
41+
private $busNames;
4242

43-
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array())
43+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = array(), array $busNames = array())
4444
{
45-
$this->bus = $bus;
45+
$this->busLocator = $busLocator;
4646
$this->receiverLocator = $receiverLocator;
4747
$this->logger = $logger;
4848
$this->receiverNames = $receiverNames;
49+
$this->busNames = $busNames;
4950

5051
parent::__construct();
5152
}
@@ -56,13 +57,15 @@ public function __construct(MessageBusInterface $bus, ContainerInterface $receiv
5657
protected function configure(): void
5758
{
5859
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
60+
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
5961

6062
$this
6163
->setDefinition(array(
6264
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
6365
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
6466
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
6567
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
68+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
6669
))
6770
->setDescription('Consumes messages')
6871
->setHelp(<<<'EOF'
@@ -91,18 +94,35 @@ protected function configure(): void
9194
*/
9295
protected function interact(InputInterface $input, OutputInterface $output)
9396
{
94-
if (!$this->receiverNames || $this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
95-
return;
97+
$style = new SymfonyStyle($input, $output);
98+
99+
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
100+
if (null === $receiverName) {
101+
$style->block('Missing receiver argument.', null, 'error', ' ', true);
102+
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
103+
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
104+
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
105+
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
106+
$input->setArgument('receiver', $alternatives[0]);
107+
}
108+
}
96109
}
97110

98-
$style = new SymfonyStyle($input, $output);
99-
if (null === $receiverName) {
100-
$style->block('Missing receiver argument.', null, 'error', ' ', true);
101-
$input->setArgument('receiver', $style->choice('Select one of the available receivers', $this->receiverNames));
102-
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
103-
$style->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
104-
if ($style->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
105-
$input->setArgument('receiver', $alternatives[0]);
111+
$busName = $input->getOption('bus');
112+
if ($this->busNames && !$this->busLocator->has($busName)) {
113+
if (null === $busName) {
114+
$style->block('Missing bus argument.', null, 'error', ' ', true);
115+
$input->setOption('bus', $style->choice('Select one of the available buses', $this->busNames));
116+
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
117+
$style->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
118+
119+
if (1 === \count($alternatives)) {
120+
if ($style->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
121+
$input->setOption('bus', $alternatives[0]);
122+
}
123+
} else {
124+
$input->setOption('bus', $style->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
125+
}
106126
}
107127
}
108128
}
@@ -116,7 +136,12 @@ protected function execute(InputInterface $input, OutputInterface $output): void
116136
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
117137
}
118138

139+
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
140+
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141+
}
142+
119143
$receiver = $this->receiverLocator->get($receiverName);
144+
$bus = $this->busLocator->get($busName);
120145

121146
if ($limit = $input->getOption('limit')) {
122147
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
@@ -130,7 +155,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
130155
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
131156
}
132157

133-
$worker = new Worker($receiver, $this->bus);
158+
$worker = new Worker($receiver, $bus);
134159
$worker->run();
135160
}
136161

DependencyInjection/MessengerPass.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,17 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
207207
}
208208
$container->getDefinition('console.command.messenger_debug')->replaceArgument(0, $debugCommandMapping);
209209
}
210+
211+
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
212+
$buses = array();
213+
foreach ($busIds as $busId) {
214+
$buses[$busId] = new Reference($busId);
215+
}
216+
$container
217+
->getDefinition('console.command.messenger_consume_messages')
218+
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
219+
->replaceArgument(4, $busIds);
220+
}
210221
}
211222

212223
private function guessHandledClasses(\ReflectionClass $handlerClass, string $serviceId): iterable

Tests/Command/ConsumeMessagesCommandTest.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,20 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\DependencyInjection\ServiceLocator;
1616
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
17-
use Symfony\Component\Messenger\MessageBus;
1817

1918
class ConsumeMessagesCommandTest extends TestCase
2019
{
2120
public function testConfigurationWithDefaultReceiver()
2221
{
23-
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
22+
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp'));
2423
$inputArgument = $command->getDefinition()->getArgument('receiver');
2524
$this->assertFalse($inputArgument->isRequired());
2625
$this->assertSame('amqp', $inputArgument->getDefault());
2726
}
2827

2928
public function testConfigurationWithoutDefaultReceiver()
3029
{
31-
$command = new ConsumeMessagesCommand($this->createMock(MessageBus::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
30+
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, array('amqp', 'dummy'));
3231
$inputArgument = $command->getDefinition()->getArgument('receiver');
3332
$this->assertTrue($inputArgument->isRequired());
3433
$this->assertNull($inputArgument->getDefault());

Tests/DependencyInjection/MessengerPassTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,11 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
241241
{
242242
$container = $this->getContainerBuilder();
243243
$container->register('console.command.messenger_consume_messages', ConsumeMessagesCommand::class)->setArguments(array(
244-
new Reference('message_bus'),
244+
null,
245245
new Reference('messenger.receiver_locator'),
246246
null,
247247
null,
248+
null,
248249
));
249250

250251
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', array('alias' => 'amqp'));
@@ -253,6 +254,7 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
253254
(new MessengerPass())->process($container);
254255

255256
$this->assertSame(array('amqp', 'dummy'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
257+
$this->assertSame(array('message_bus'), $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
256258
}
257259

258260
public function testItRegistersSenders()

0 commit comments

Comments
 (0)