Skip to content

Commit 3bf7c82

Browse files
weaverryansroze
andcommitted
Adding global retry support, events & more to messenger transport
Co-authored-by: Samuel ROZE <[email protected]>
1 parent 69bf877 commit 3bf7c82

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1553
-229
lines changed

CHANGELOG.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,34 @@ CHANGELOG
33

44
4.3.0
55
-----
6-
6+
7+
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
8+
`ack()` and `reject()`.
9+
* [BC BREAK] Error handling was moved from the receivers into
10+
`Worker`. Implementations of `ReceiverInterface::handle()`
11+
should now allow all exceptions to be thrown, except for transport
12+
exceptions. They should also not retry (e.g. if there's a queue,
13+
remove from the queue) if there is a problem decoding the message.
14+
* [BC BREAK] `RejectMessageExceptionInterface` was removed and replaced
15+
by `Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException`,
16+
which has the same behavior: a message will not be retried
17+
* The default command name for `ConsumeMessagesCommand` was
18+
changed from `messenger:consume-messages` to `messenger:consume`
19+
* `ConsumeMessagesCommand` has two new optional constructor arguments
20+
* `Worker` has 4 new option constructor arguments.
21+
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
22+
receiver no longer needs to call this.
23+
* The `AmqpSender` will now retry messages using a dead-letter exchange
24+
and delayed queues, instead of retrying via `nack()`
25+
* Senders now receive the `Envelope` with the `SentStamp` on it. Previously,
26+
the `Envelope` was passed to the sender and *then* the `SentStamp`
27+
was added.
28+
* `SerializerInterface` implementations should now throw a
29+
`Symfony\Component\Messenger\Exception\MessageDecodingFailedException`
30+
if `decode()` fails for any reason.
31+
* [BC BREAK] The default `Serializer` will now throw a
32+
`MessageDecodingFailedException` if `decode()` fails, instead
33+
of the underlying exceptions from the Serializer component.
734
* Added `PhpSerializer` which uses PHP's native `serialize()` and
835
`unserialize()` to serialize messages to a transport
936
* [BC BREAK] If no serializer were passed, the default serializer

Command/ConsumeMessagesCommand.php

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23+
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
2324
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -32,21 +33,25 @@
3233
*/
3334
class ConsumeMessagesCommand extends Command
3435
{
35-
protected static $defaultName = 'messenger:consume-messages';
36+
protected static $defaultName = 'messenger:consume';
3637

3738
private $busLocator;
3839
private $receiverLocator;
3940
private $logger;
4041
private $receiverNames;
4142
private $busNames;
43+
private $retryStrategyLocator;
44+
private $eventDispatcher;
4245

43-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
46+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4447
{
4548
$this->busLocator = $busLocator;
4649
$this->receiverLocator = $receiverLocator;
4750
$this->logger = $logger;
4851
$this->receiverNames = $receiverNames;
4952
$this->busNames = $busNames;
53+
$this->retryStrategyLocator = $retryStrategyLocator;
54+
$this->eventDispatcher = $eventDispatcher;
5055

5156
parent::__construct();
5257
}
@@ -132,6 +137,12 @@ protected function interact(InputInterface $input, OutputInterface $output)
132137
*/
133138
protected function execute(InputInterface $input, OutputInterface $output): void
134139
{
140+
if (false !== strpos($input->getFirstArgument(), ':consume-')) {
141+
$message = 'The use of the "messenger:consume-messages" command is deprecated since version 4.3 and will be removed in 5.0. Use "messenger:consume" instead.';
142+
@trigger_error($message, E_USER_DEPRECATED);
143+
$output->writeln(sprintf('<comment>%s</comment>', $message));
144+
}
145+
135146
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
136147
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
137148
}
@@ -140,8 +151,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
140151
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141152
}
142153

154+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
155+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
156+
}
157+
143158
$receiver = $this->receiverLocator->get($receiverName);
144159
$bus = $this->busLocator->get($busName);
160+
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
145161

146162
$stopsWhen = [];
147163
if ($limit = $input->getOption('limit')) {
@@ -174,7 +190,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
174190
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
175191
}
176192

177-
$worker = new Worker($receiver, $bus);
193+
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
178194
$worker->run();
179195
}
180196

Envelope.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ public function with(StampInterface ...$stamps): self
5454
return $cloned;
5555
}
5656

57+
/**
58+
* @return Envelope a new Envelope instance without any stamps of the given class
59+
*/
60+
public function withoutAll(string $stampFqcn): self
61+
{
62+
$cloned = clone $this;
63+
64+
unset($cloned->stamps[$stampFqcn]);
65+
66+
return $cloned;
67+
}
68+
5769
public function last(string $stampFqcn): ?StampInterface
5870
{
5971
return isset($this->stamps[$stampFqcn]) ? end($this->stamps[$stampFqcn]) : null;

Event/AbstractWorkerMessageEvent.php

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\EventDispatcher\Event;
15+
use Symfony\Component\Messenger\Envelope;
16+
17+
/**
18+
* @experimental in 4.3
19+
*/
20+
abstract class AbstractWorkerMessageEvent extends Event
21+
{
22+
private $envelope;
23+
private $receiverName;
24+
25+
public function __construct(Envelope $envelope, string $receiverName)
26+
{
27+
$this->envelope = $envelope;
28+
$this->receiverName = $receiverName;
29+
}
30+
31+
public function getEnvelope(): Envelope
32+
{
33+
return $this->envelope;
34+
}
35+
36+
/**
37+
* Returns a unique identifier for transport receiver this message was received from.
38+
*/
39+
public function getReceiverName(): string
40+
{
41+
return $this->receiverName;
42+
}
43+
}

Event/WorkerMessageFailedEvent.php

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Dispatched when a message was received from a transport and handling failed.
18+
*
19+
* The event name is the class name.
20+
*
21+
* @experimental in 4.3
22+
*/
23+
class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
24+
{
25+
private $throwable;
26+
private $willRetry;
27+
28+
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
29+
{
30+
$this->throwable = $error;
31+
$this->willRetry = $willRetry;
32+
33+
parent::__construct($envelope, $receiverName);
34+
}
35+
36+
public function getThrowable(): \Throwable
37+
{
38+
return $this->throwable;
39+
}
40+
41+
public function willRetry(): bool
42+
{
43+
return $this->willRetry;
44+
}
45+
}

Event/WorkerMessageHandledEvent.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched after a message was received from a transport and successfully handled.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageHandledEvent extends AbstractWorkerMessageEvent
22+
{
23+
}

Event/WorkerMessageReceivedEvent.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
/**
15+
* Dispatched when a message was received from a transport but before sent to the bus.
16+
*
17+
* The event name is the class name.
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class WorkerMessageReceivedEvent extends AbstractWorkerMessageEvent
22+
{
23+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* Thrown when a message cannot be decoded in a serializer.
16+
*
17+
* @experimental in 4.3
18+
*/
19+
class MessageDecodingFailedException extends \InvalidArgumentException implements ExceptionInterface
20+
{
21+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* Thrown while handling a message to indicate that handling will continue to fail.
16+
*
17+
* If something goes wrong while handling a message that's received from a transport
18+
* and the message should not be retried, a handler can throw this exception.
19+
*
20+
* @author Frederic Bouchery <[email protected]>
21+
*
22+
* @experimental in 4.3
23+
*/
24+
class UnrecoverableMessageHandlingException extends RuntimeException
25+
{
26+
}

Middleware/SendMessageMiddleware.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1819
use Symfony\Component\Messenger\Stamp\SentStamp;
1920
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
2021

@@ -54,9 +55,22 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5455
// it's a received message, do not send it back
5556
$this->logger->info('Received message "{class}"', $context);
5657
} else {
58+
/** @var RedeliveryStamp|null $redeliveryStamp */
59+
$redeliveryStamp = $envelope->last(RedeliveryStamp::class);
60+
5761
foreach ($this->sendersLocator->getSenders($envelope, $handle) as $alias => $sender) {
62+
// on redelivery, only deliver to the given sender
63+
if (null !== $redeliveryStamp && !$redeliveryStamp->shouldRedeliverToSender(\get_class($sender), $alias)) {
64+
continue;
65+
}
66+
5867
$this->logger->info('Sending message "{class}" with "{sender}"', $context + ['sender' => \get_class($sender)]);
59-
$envelope = $sender->send($envelope)->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null));
68+
$envelope = $sender->send($envelope->with(new SentStamp(\get_class($sender), \is_string($alias) ? $alias : null)));
69+
}
70+
71+
// on a redelivery, never call local handlers
72+
if (null !== $redeliveryStamp) {
73+
$handle = false;
6074
}
6175
}
6276

0 commit comments

Comments
 (0)