Skip to content

[Feature] Bulk Consumer #413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jun 12, 2017
Merged

[Feature] Bulk Consumer #413

merged 23 commits into from
Jun 12, 2017

Conversation

devrck
Copy link
Contributor

@devrck devrck commented Dec 7, 2016

#403 implementation

  • This new type of consumer gets x messages from the queue in unacknowledged state
  • Executes for each of them the @execute() method
  • When the batch is complete or it's timeout wait is fulfilled then it will fire the method @batchExecute()
  • If the @batchExecute() is without any exceptions it will acknowledged the messages if not it will re-queue them to be handled by another worker

[ Obs ]: Message can get stuck on the queue forever because of the behavior of the @batchExecute() when it has exceptions. I am open for suggestions on how to fix this.

Configuration used in app/config/config.yml

# RabbitMQ Configuration
old_sound_rabbit_mq:
    connections:
        default:
            host:     '%rabbit_mq_conn_default_host%'
            port:     '%rabbit_mq_conn_default_port%'
            user:     '%rabbit_mq_conn_default_user%'
            password: '%rabbit_mq_conn_default_password%'
            vhost:    '%rabbit_mq_conn_default_vhost%'
            lazy:     '%rabbit_mq_conn_default_lazy%'
    batch_consumers:
        devck_basic:
            connection:       default
            exchange_options: {name: '%rabbit_mq_exchange_devck%', type: fanout}
            queue_options:    {name: '%rabbit_mq_queue_devck%'}
            callback:         devck.basic
            qos_options:      {prefetch_size: 0, prefetch_count: 2, global: false}
            timeout_wait:     5
            auto_setup_fabric: false
            idle_timeout_exit_code: -2

Callback class

<?php

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    public function batchExecute()
    {
        echo 'Doing batch execution', PHP_EOL;
    }

    /**
     * @inheritDoc
     */
    public function execute(AMQPMessage $msg)
    {
        echo $msg->body, PHP_EOL;

        return true;
    }
}

@skafandri
Copy link
Collaborator

How I access the messages in batchExecute() ?

interface BatchConsumerInterface extends ConsumerInterface
{
public function batchExecute();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line :)

{
$this->getChannel()->wait(null, false, $timeout);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line :)

{
return 'old_sound_rabbit_mq.%s_batch';
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new line :)

@devrck
Copy link
Contributor Author

devrck commented Dec 8, 2016

Well my premise is that you do work in the execute() part and then in the batchExecute() you will do a action that is very costly.

Take by example a batch process for inserting in a database using Doctrine. You do the work in execute(), persisting objects, modifying objects and then in the batchExecute() you will do a flush.

After you've asked how do you access the messages I thought you should have access and I can give access to the messages in the batchExecute() modifying its signature to:

/**
 * @param   AMQPMessage[]  $messages
 *
 * @return   void
 */
public function batchExecute(array $messages);

Q: @skafandri what do you think?

PS: I will fix the CR 😄 as soon as possible.

@skafandri
Copy link
Collaborator

I ma not sure if I understand the bulk idea in this context well. What I understood is:

  • Get a batch of messages
  • Do some work
  • Return true to ack them all or false to nack them all
  • It should be impossible (as good as we can do) to ack/nack only some messages

@devrck
Copy link
Contributor Author

devrck commented Dec 9, 2016

Yes you've got it.

The problem with this is that if your message generates an exception it will be stuck on your queue forever if you don't have logic to get it off. I can implement a mechanism to know if your messages has been passed along for some time but I don't know if I should do this or the developer.

Whats your opinion?

@skafandri
Copy link
Collaborator

I think about it to be similar to an SQL transaction

start transaction;
insert into order_line(`id`, `product_id`) values (1, 10);
insert into order_line(`id`, `product_id`) values (1, 11);
insert into order_line(`id`, `product_id`) values (1, NULL);
commit;

As long as product_id is not nullable, all the inserts will fail.
If the bulk operation is not atomic then I wouldn't really need this feature. All I would do is:

public function execute(AMQPMessage $msg)
{
           $result = $this->processMessage($msg);
          
           if ($this->reachedBulkCondition()) {
                       $this->bulkAction();
                       $this->resetBulkCondition();
           }
           return $result;
}

Where reachedBulkCondition can check for the number of processed messages or an elapsed time or both.

Now back to your implementation, if batchExecute throws an exception, I think ALL the messages should be nacked. If one of those messages is causing the exception it will effectively get stuck in the queue. But this is the developer's job to handle this situation. The same behavior is present with the single consumer as well.

In other words, I expect batchExecute to behave exactly like execute. The only difference being the unit of work, AMQPMessage for execute and AMQPMessage[] for batchExecute.

I wouldn't mind creating an AMQPBulkMessage which will contain a collection of AMQPMessages to make it clear to the user that he is dealing (nacking/acking) with a collection of messages.

@devrck
Copy link
Contributor Author

devrck commented Dec 9, 2016

Hmm now that I think about my solution is a work around to this batchExecute(array $messages) and the existence of execute(AMQPMessage $msg) doesn't seem of any use. I will give another shoot at this and get all the messages in the batchExecute(array $messages) part.

Bogdan Rancichi added 5 commits December 10, 2016 15:36
…ause it does the work in batches, could be added later on maybe, but with other meaning (how many batches not how many messages)
… messages and it can return for each message the response ot ack/nack/requeue/reject or 1 single response applied for all.
…consumer just has an array of messages instead of a single one
@devrck
Copy link
Contributor Author

devrck commented Dec 10, 2016

Fixed CR & implemented another approach to the bulk consumer.

@devrck
Copy link
Contributor Author

devrck commented Dec 16, 2016

Hi guys did anybody got a chance to look over this?

@devrck
Copy link
Contributor Author

devrck commented Dec 16, 2016

A consumer can look like this.

It basically has the same structure as before but now i pass an array of messages to batchExecute (as @skafandri 👍 pointed out that it should work like execute(AMQPMessage $msg) and be atomic).

In the first example you can just return one bool value that is the same as saying the same value for each message:

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        foreach ($messages as $message) {
            $this->executeSomeLogicPerMessage($message);
        }

        return true;
    }
}

The second example offers you more control on what to do with each individual message but you have to keep track of the messages and assign for each delivery_tag a value and then return that result back to the batch consumer:

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        $result = [];
        /** @var AMQPMessage $message */
        foreach ($messages as $message) {
            $result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
        }

        return $result;
    }
}

@skafandri doing another class AMQPBulkMessages is the best choice IMHO for this batch consumer but from what I've investigated (maybe I need more time) you still get one message and then the next.

@skafandri
Copy link
Collaborator

Looks good for me. I think we should document some edge cases so users won't get confused.

What happens of the batch size is smaller than the prefetch count?
What happens if there are less messages available in the queue than the batch size.
We should also mention that a batch is not fixed. With a batch of 3, if I nack [m1, m2, m3] there is no guarantee (most likely not) that another consumer will receive the same batch, it may receive [m1, m2, m5]

@devrck
Copy link
Contributor Author

devrck commented Dec 22, 2016

First of all this is the configuration for a batch consumer (It the same as above):

  # RabbitMQ Configuration
old_sound_rabbit_mq:
    connections:
        default:
            host:     '%rabbit_mq_conn_default_host%'
            port:     '%rabbit_mq_conn_default_port%'
            user:     '%rabbit_mq_conn_default_user%'
            password: '%rabbit_mq_conn_default_password%'
            vhost:    '%rabbit_mq_conn_default_vhost%'
            lazy:     '%rabbit_mq_conn_default_lazy%'
    batch_consumers:
        devck_basic:
            connection:       default
            exchange_options: {name: '%rabbit_mq_exchange_devck%', type: fanout}
            queue_options:    {name: '%rabbit_mq_queue_devck%'}
            callback:         devck.basic
            qos_options:      {prefetch_size: 0, prefetch_count: 2, global: false}
            timeout_wait:     5
            auto_setup_fabric: false
            idle_timeout_exit_code: -2

Getting back to your questions:

Q: What happens of the batch size is smaller than the prefetch count?
A: The prefetch_count is the batch_size by my implementation, and I think I have to restrict user to input a value greater or equal to 2 ( I will check and push changes if I haven't yet).

Q: What happens if there are less messages available in the queue than the batch size?
A: It will get all the messages it has and after timeout_wait of no more messages received on the line you will receive the messages in the batchExecute() method. timeout_wait is different from idle_timeout. timeout_wait acts like a buffer in case something comes along on the line.

Q: We should also mention that a batch is not fixed. With a batch of 3, if I nack [m1, m2, m3] there is no guarantee (most likely not) that another consumer will receive the same batch, it may receive [m1, m2, m5]
A: I agree there is no guarantee that you will receive the same batch because the dispatching is handled by the RabbitMq server.

Additional situations:

Prefetch with large messages:

    • Queue: 1 messages of 3kb, N messages of 1kb
    • Qos: prefetch_size: 3kb , prefetch_count: 3
    • In this scenario how does the system behave? (It should receive the first messages of 3kb and send it to the batchExecute() because it thats the maximum and it should not stick around timeout_wait time for other messages).
    • Queue: 1 messages of 3kb, N messages of 1kb
    • Qos: prefetch_size: 2kb, prefetch_count: 3
    • In this scenario I don't see a solution, that 1st message will get stuck on the queue forever correct?

@skafandri
Copy link
Collaborator

good job, let's add this to README.md so we can review it better to make sure is accurate, helpful, and fits well with the current documentation.

@Nightbr
Copy link

Nightbr commented Feb 21, 2017

Any news? It is a great feature !! Awesome works! We really need it!

@devrck
Copy link
Contributor Author

devrck commented Feb 21, 2017

@Nightbr will get back to it to complete the documentation and add revise the code.

@christianmeller
Copy link
Contributor

Thank you for this awesome work. This is exactly what we was looking for.
Could you estimate how long it should take to finish this feature? We really need it as well ;-)

@devrck
Copy link
Contributor Author

devrck commented Jun 6, 2017

@skafandri after a long time I've added the README part.

README.md Outdated
}
}

```
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be: "```php"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

// you ack all messages got in batch
return true;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing close tag: "```"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


$this->maybeStopConsumer();

if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null !==

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

$this->consumed++;
$this->maybeStopConsumer();

if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null !==

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

/**
* @var int
*/
protected $consumed = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not expected to be extended please mark all variables as private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

public function getConfigTreeBuilder()
{
$tree = new TreeBuilder();

$rootNode = $tree->root('old_sound_rabbit_mq');
$rootNode = $tree->root($this->name);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? We don't plan to change the configuration name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make use of already defined alias.

$this->consumer = $this->getContainer()
->get(sprintf($this->getConsumerService(), $input->getArgument('name')));

if (!is_null($input->getOption('memory-limit')) && ctype_digit((string) $input->getOption('memory-limit')) && $input->getOption('memory-limit') > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null !==


public function restartConsumer()
{
// TODO: Implement restarting of consumer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed or added before merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

protected $consumer;

/**
* @return void
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such phpdoc is useless IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@devrck
Copy link
Contributor Author

devrck commented Jun 7, 2017

Waiting review 😄

$consumer['qos_options']['global']
))
;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method call for setIdleTimeoutExitCode (key: idle_timeout_exit_code) is missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

$this->batchConsume();
}

$timeoutWanted = ($isConsuming) ? $this->getTimeoutWait() : $this->getIdleTimeout();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( & ) are useless here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


public function batchConsume()
{
if ($this->batchCounter == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 === $this->batchCounter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented.


if ($this->forceStop) {
$this->stopConsuming();
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole } else { & return; is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.


pcntl_signal(SIGTERM, array(&$this, 'stopConsumer'));
pcntl_signal(SIGINT, array(&$this, 'stopConsumer'));
// pcntl_signal(SIGHUP, array(&$this, 'restartConsumer'));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like unused.

)
));
throw $e;
} finally {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use finally as we support PHP 5.3+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, fixed.

@devrck
Copy link
Contributor Author

devrck commented Jun 12, 2017

@stloyd what about this PR? What changes do I have to make?

@stloyd stloyd merged commit 564e87c into php-amqplib:master Jun 12, 2017
@stloyd
Copy link
Collaborator

stloyd commented Jun 12, 2017

Thank you @devrck !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants