Skip to content

Commit 69bf877

Browse files
Nyholmogizanagi
andcommitted
Support for handling messages after current bus is finished
Co-authored-by: Maxime Steinhausser <[email protected]>
1 parent e216d9a commit 69bf877

File tree

4 files changed

+362
-0
lines changed

4 files changed

+362
-0
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* When handling queued messages from {@link DispatchAfterCurrentBusMiddleware},
16+
* some handlers caused an exception. This exception contains all those handler exceptions.
17+
*
18+
* @author Tobias Nyholm <[email protected]>
19+
*/
20+
class DelayedMessageHandlingException extends \RuntimeException implements ExceptionInterface
21+
{
22+
private $exceptions;
23+
24+
public function __construct(array $exceptions)
25+
{
26+
$exceptionMessages = implode(", \n", array_map(
27+
function (\Throwable $e) {
28+
return \get_class($e).': '.$e->getMessage();
29+
},
30+
$exceptions
31+
));
32+
33+
if (1 === \count($exceptions)) {
34+
$message = sprintf("A delayed message handler threw an exception: \n\n%s", $exceptionMessages);
35+
} else {
36+
$message = sprintf("Some delayed message handlers threw an exception: \n\n%s", $exceptionMessages);
37+
}
38+
39+
$this->exceptions = $exceptions;
40+
41+
parent::__construct($message, 0, $exceptions[0]);
42+
}
43+
44+
public function getExceptions(): array
45+
{
46+
return $this->exceptions;
47+
}
48+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
16+
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
17+
18+
/**
19+
* Allow to configure messages to be handled after the current bus is finished.
20+
*
21+
* I.e, messages dispatched from a handler with a DispatchAfterCurrentBus stamp
22+
* will actually be handled once the current message being dispatched is fully
23+
* handled.
24+
*
25+
* For instance, using this middleware before the DoctrineTransactionMiddleware
26+
* means sub-dispatched messages with a DispatchAfterCurrentBus stamp would be
27+
* handled after the Doctrine transaction has been committed.
28+
*
29+
* @author Tobias Nyholm <[email protected]>
30+
*/
31+
class DispatchAfterCurrentBusMiddleware implements MiddlewareInterface
32+
{
33+
/**
34+
* @var QueuedEnvelope[] A queue of messages and next middleware
35+
*/
36+
private $queue = [];
37+
38+
/**
39+
* @var bool this property is used to signal if we are inside a the first/root call to
40+
* MessageBusInterface::dispatch() or if dispatch has been called inside a message handler
41+
*/
42+
private $isRootDispatchCallRunning = false;
43+
44+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
45+
{
46+
if (null !== $envelope->last(DispatchAfterCurrentBusStamp::class)) {
47+
if (!$this->isRootDispatchCallRunning) {
48+
throw new \LogicException(sprintf('You can only use a "%s" stamp in the context of a message handler.', DispatchAfterCurrentBusStamp::class));
49+
}
50+
$this->queue[] = new QueuedEnvelope($envelope, $stack);
51+
52+
return $envelope;
53+
}
54+
55+
if ($this->isRootDispatchCallRunning) {
56+
/*
57+
* A call to MessageBusInterface::dispatch() was made from inside the main bus handling,
58+
* but the message does not have the stamp. So, process it like normal.
59+
*/
60+
return $stack->next()->handle($envelope, $stack);
61+
}
62+
63+
// First time we get here, mark as inside a "root dispatch" call:
64+
$this->isRootDispatchCallRunning = true;
65+
try {
66+
// Execute the whole middleware stack & message handling for main dispatch:
67+
$returnedEnvelope = $stack->next()->handle($envelope, $stack);
68+
} catch (\Throwable $exception) {
69+
/*
70+
* Whenever an exception occurs while handling a message that has
71+
* queued other messages, we drop the queued ones.
72+
* This is intentional since the queued commands were likely dependent
73+
* on the preceding command.
74+
*/
75+
$this->queue = [];
76+
$this->isRootDispatchCallRunning = false;
77+
78+
throw $exception;
79+
}
80+
81+
// "Root dispatch" call is finished, dispatch stored messages.
82+
$exceptions = [];
83+
while (null !== $queueItem = array_shift($this->queue)) {
84+
try {
85+
// Execute the stored messages
86+
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
87+
} catch (\Exception $exception) {
88+
// Gather all exceptions
89+
$exceptions[] = $exception;
90+
}
91+
}
92+
93+
$this->isRootDispatchCallRunning = false;
94+
if (\count($exceptions) > 0) {
95+
throw new DelayedMessageHandlingException($exceptions);
96+
}
97+
98+
return $returnedEnvelope;
99+
}
100+
}
101+
102+
/**
103+
* @internal
104+
*/
105+
final class QueuedEnvelope
106+
{
107+
/** @var Envelope */
108+
private $envelope;
109+
110+
/** @var StackInterface */
111+
private $stack;
112+
113+
public function __construct(Envelope $envelope, StackInterface $stack)
114+
{
115+
$this->envelope = $envelope;
116+
$this->stack = $stack;
117+
}
118+
119+
public function getEnvelope(): Envelope
120+
{
121+
return $this->envelope;
122+
}
123+
124+
public function getStack(): StackInterface
125+
{
126+
return $this->stack;
127+
}
128+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Symfony\Component\Messenger\Stamp;
15+
16+
/**
17+
* Marker item to tell this message should be handled in after the current bus has finished.
18+
*
19+
* @see \Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware
20+
*
21+
* @author Tobias Nyholm <[email protected]>
22+
*/
23+
class DispatchAfterCurrentBusStamp implements StampInterface
24+
{
25+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use PHPUnit\Framework\MockObject\MockObject;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException;
18+
use Symfony\Component\Messenger\MessageBus;
19+
use Symfony\Component\Messenger\MessageBusInterface;
20+
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
21+
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
22+
use Symfony\Component\Messenger\Middleware\StackInterface;
23+
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
24+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
25+
26+
class DispatchAfterCurrentBusMiddlewareTest extends TestCase
27+
{
28+
public function testEventsInNewTransactionAreHandledAfterMainMessage()
29+
{
30+
$message = new DummyMessage('Hello');
31+
32+
$firstEvent = new DummyEvent('First event');
33+
$secondEvent = new DummyEvent('Second event');
34+
$thirdEvent = new DummyEvent('Third event');
35+
36+
$middleware = new DispatchAfterCurrentBusMiddleware();
37+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
38+
39+
$eventBus = new MessageBus([
40+
$middleware,
41+
$handlingMiddleware,
42+
]);
43+
44+
$messageBus = new MessageBus([
45+
$middleware,
46+
new DispatchingMiddleware($eventBus, [
47+
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
48+
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
49+
$thirdEvent, // Not in a new transaction
50+
]),
51+
$handlingMiddleware,
52+
]);
53+
54+
// Third event is dispatch within main dispatch, but before its handling:
55+
$this->expectHandledMessage($handlingMiddleware, 0, $thirdEvent);
56+
// Then expect main dispatched message to be handled first:
57+
$this->expectHandledMessage($handlingMiddleware, 1, $message);
58+
// Then, expect events in new transaction to be handled next, in dispatched order:
59+
$this->expectHandledMessage($handlingMiddleware, 2, $firstEvent);
60+
$this->expectHandledMessage($handlingMiddleware, 3, $secondEvent);
61+
62+
$messageBus->dispatch($message);
63+
}
64+
65+
public function testThrowingEventsHandlingWontStopExecution()
66+
{
67+
$message = new DummyMessage('Hello');
68+
69+
$firstEvent = new DummyEvent('First event');
70+
$secondEvent = new DummyEvent('Second event');
71+
72+
$middleware = new DispatchAfterCurrentBusMiddleware();
73+
$handlingMiddleware = $this->createMock(MiddlewareInterface::class);
74+
75+
$eventBus = new MessageBus([
76+
$middleware,
77+
$handlingMiddleware,
78+
]);
79+
80+
$messageBus = new MessageBus([
81+
$middleware,
82+
new DispatchingMiddleware($eventBus, [
83+
new Envelope($firstEvent, new DispatchAfterCurrentBusStamp()),
84+
new Envelope($secondEvent, new DispatchAfterCurrentBusStamp()),
85+
]),
86+
$handlingMiddleware,
87+
]);
88+
89+
// Expect main dispatched message to be handled first:
90+
$this->expectHandledMessage($handlingMiddleware, 0, $message);
91+
// Then, expect events in new transaction to be handled next, in dispatched order:
92+
$this->expectThrowingHandling($handlingMiddleware, 1, $firstEvent, new \RuntimeException('Some exception while handling first event'));
93+
// Next event is still handled despite the previous exception:
94+
$this->expectHandledMessage($handlingMiddleware, 2, $secondEvent);
95+
96+
$this->expectException(DelayedMessageHandlingException::class);
97+
$this->expectExceptionMessage('RuntimeException: Some exception while handling first event');
98+
99+
$messageBus->dispatch($message);
100+
}
101+
102+
/**
103+
* @param MiddlewareInterface|MockObject $handlingMiddleware
104+
*/
105+
private function expectHandledMessage(MiddlewareInterface $handlingMiddleware, int $at, $message): void
106+
{
107+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
108+
return $envelope->getMessage() === $message;
109+
}))->willReturnCallback(function ($envelope, StackInterface $stack) {
110+
return $stack->next()->handle($envelope, $stack);
111+
});
112+
}
113+
114+
/**
115+
* @param MiddlewareInterface|MockObject $handlingMiddleware
116+
*/
117+
private function expectThrowingHandling(MiddlewareInterface $handlingMiddleware, int $at, $message, \Throwable $throwable): void
118+
{
119+
$handlingMiddleware->expects($this->at($at))->method('handle')->with($this->callback(function (Envelope $envelope) use ($message) {
120+
return $envelope->getMessage() === $message;
121+
}))->willReturnCallback(function () use ($throwable) {
122+
throw $throwable;
123+
});
124+
}
125+
}
126+
127+
class DummyEvent
128+
{
129+
private $message;
130+
131+
public function __construct(string $message)
132+
{
133+
$this->message = $message;
134+
}
135+
136+
public function getMessage(): string
137+
{
138+
return $this->message;
139+
}
140+
}
141+
142+
class DispatchingMiddleware implements MiddlewareInterface
143+
{
144+
private $bus;
145+
private $messages;
146+
147+
public function __construct(MessageBusInterface $bus, array $messages)
148+
{
149+
$this->bus = $bus;
150+
$this->messages = $messages;
151+
}
152+
153+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
154+
{
155+
foreach ($this->messages as $event) {
156+
$this->bus->dispatch($event);
157+
}
158+
159+
return $stack->next()->handle($envelope, $stack);
160+
}
161+
}

0 commit comments

Comments
 (0)