Skip to content

refs #664: message handling on StopConsumerException fix #665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions RabbitMq/BatchConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@ private function batchConsume()
)
));
} catch (Exception\StopConsumerException $e) {
$this->logger->info('Consumer requested restart', array(
$this->logger->info('Consumer requested stop', array(
'amqp' => array(
'queue' => $this->queueOptions['name'],
'message' => $this->messages,
'stacktrace' => $e->getTraceAsString()
)
));
$this->handleProcessMessages($e->getHandleCode());
$this->resetBatch();
$this->stopConsuming();
} catch (\Exception $e) {
Expand Down Expand Up @@ -213,12 +214,14 @@ private function handleProcessFlag($deliveryTag, $processFlag)
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
// Reject and requeue message to RabbitMQ
$this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true);
} else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
} elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
// NACK and requeue message to RabbitMQ
$this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true);
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
} elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
// Reject and drop
$this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false);
} elseif ($processFlag == ConsumerInterface::MSG_ACK_SENT) {
// do nothing, ACK should be already sent
} else {
// Remove message from queue only if callback return not false
$this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);
Expand Down
8 changes: 4 additions & 4 deletions RabbitMq/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $ca
)
));
} catch (Exception\StopConsumerException $e) {
$this->logger->info('Consumer requested restart', array(
$this->logger->info('Consumer requested stop', array(
'amqp' => array(
'queue' => $queueName,
'message' => $msg,
Expand Down Expand Up @@ -198,13 +198,13 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
// Reject and requeue message to RabbitMQ
$msg->reject();
} else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
} elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
// NACK and requeue message to RabbitMQ
$msg->nack(true);
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
} elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
// Reject and drop
$msg->reject(false);
} else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
} elseif ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
// Remove message from queue only if callback return not false
$msg->ack();
}
Expand Down