Skip to content

[Messenger] Add BatchHandlerInterface and BatchHandlerTrait mentions #17915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions messenger.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,71 @@ That's it! You can now consume each transport:
If a handler does *not* have ``from_transport`` config, it will be executed
on *every* transport that the message is received from.

Process Messages by Batches
~~~~~~~~~~~~~~~~~~~~~~~~~~~

You can declare "special" handlers which will process messages by batch.
By doing so, the handler will wait for a certain amount of messages to be
pending before processing them. The declaration of a batch handler is done
by implementing
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerInterface`. The
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerTrait` is also
provided in order to ease the declaration of these special handlers::

use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;

class MyBatchHandler implements BatchHandlerInterface
{
use BatchHandlerTrait;

public function __invoke(MyMessage $message, Acknowledger $ack = null)
{
return $this->handle($message, $ack);
}

private function process(array $jobs): void
{
foreach ($jobs as [$message, $ack]) {
try {
// Compute $result from $message...

// Acknowledge the processing of the message
$ack->ack($result);
} catch (\Throwable $e) {
$ack->nack($e);
}
}
}

// Optionally, you can redefine the `shouldFlush()` method
// of the trait to define your own batch size
private function shouldFlush(): bool
{
return 100 <= \count($this->jobs);
}
}

.. note::

When the ``$ack`` argument of ``__invoke()`` is ``null``, the message is
expected to be handled synchronously. Otherwise, ``__invoke()`` is
expected to return the number of pending messages. The
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerTrait` handles
this for you.

.. note::

By default, pending batches are flushed when the worker is idle as well
as when it is stopped.

.. versionadded:: 5.4

:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerInterface` and
:class:`Symfony\\Component\\Messenger\\Handler\\BatchHandlerTrait` were
introduced in Symfony 5.4.

Extending Messenger
-------------------

Expand Down