Skip to content

Allow to specify that the implementer of ConsumerInterface will handle ACKs #519

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
// Reject and drop
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
} else {
} else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
// Remove message from queue only if callback return not false
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
Expand Down
4 changes: 4 additions & 0 deletions RabbitMq/ConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ interface ConsumerInterface
*/
const MSG_REJECT = -1;

/**
* Flag for consumers that wants to handle ACKs on their own
*/
const MSG_ACK_SENT = -2;

/**
* @param AMQPMessage $msg The message
Expand Down
33 changes: 20 additions & 13 deletions Tests/RabbitMq/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected function prepareAMQPChannel()
*
* @dataProvider processMessageProvider
*/
public function testProcessMessage($processFlag, $expectedMethod, $expectedRequeue = null)
public function testProcessMessage($processFlag, $expectedMethod = null, $expectedRequeue = null)
{
$amqpConnection = $this->prepareAMQPConnection();
$amqpChannel = $this->prepareAMQPChannel();
Expand All @@ -52,18 +52,24 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque
$amqpMessage->delivery_info['channel'] = $amqpChannel;
$amqpMessage->delivery_info['delivery_tag'] = 0;

$amqpChannel->expects($this->any())
->method('basic_reject')
->will($this->returnCallback(function($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
\PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
\PHPUnit_Framework_Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
}));

$amqpChannel->expects($this->any())
->method('basic_ack')
->will($this->returnCallback(function($delivery_tag) use ($expectedMethod) {
\PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
}));
if ($expectedMethod) {
$amqpChannel->expects($this->any())
->method('basic_reject')
->will($this->returnCallback(function ($delivery_tag, $requeue) use ($expectedMethod, $expectedRequeue) {
\PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_reject'); // Check if this function should be called.
\PHPUnit_Framework_Assert::assertSame($requeue, $expectedRequeue); // Check if the message should be requeued.
}));

$amqpChannel->expects($this->any())
->method('basic_ack')
->will($this->returnCallback(function ($delivery_tag) use ($expectedMethod) {
\PHPUnit_Framework_Assert::assertSame($expectedMethod, 'basic_ack'); // Check if this function should be called.
}));
} else {
$amqpChannel->expects($this->never())->method('basic_reject');
$amqpChannel->expects($this->never())->method('basic_ack');
$amqpChannel->expects($this->never())->method('basic_nack');
}
$eventDispatcher = $this->getMockBuilder('Symfony\Component\EventDispatcher\EventDispatcherInterface')
->getMock();
$consumer->setEventDispatcher($eventDispatcher);
Expand All @@ -87,6 +93,7 @@ public function processMessageProvider()
array(ConsumerInterface::MSG_ACK, 'basic_ack'), // Remove message from queue only if callback return not false
array(ConsumerInterface::MSG_REJECT_REQUEUE, 'basic_reject', true), // Reject and requeue message to RabbitMQ
array(ConsumerInterface::MSG_REJECT, 'basic_reject', false), // Reject and drop
array(ConsumerInterface::MSG_ACK_SENT), // ack not sent by the consumer but should be sent by the implementer of ConsumerInterface
);
}

Expand Down