Skip to content

Commit 63d047d

Browse files
authored
Merge pull request #665 from Eloar/664_fix
fixes #664: message handling on StopConsumerException fix
2 parents f02ca21 + 21fae9a commit 63d047d

File tree

2 files changed

+10
-7
lines changed

2 files changed

+10
-7
lines changed

RabbitMq/BatchConsumer.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,14 @@ private function batchConsume()
154154
)
155155
));
156156
} catch (Exception\StopConsumerException $e) {
157-
$this->logger->info('Consumer requested restart', array(
157+
$this->logger->info('Consumer requested stop', array(
158158
'amqp' => array(
159159
'queue' => $this->queueOptions['name'],
160160
'message' => $this->messages,
161161
'stacktrace' => $e->getTraceAsString()
162162
)
163163
));
164+
$this->handleProcessMessages($e->getHandleCode());
164165
$this->resetBatch();
165166
$this->stopConsuming();
166167
} catch (\Exception $e) {
@@ -213,12 +214,14 @@ private function handleProcessFlag($deliveryTag, $processFlag)
213214
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
214215
// Reject and requeue message to RabbitMQ
215216
$this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true);
216-
} else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
217+
} elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
217218
// NACK and requeue message to RabbitMQ
218219
$this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true);
219-
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
220+
} elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
220221
// Reject and drop
221222
$this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false);
223+
} elseif ($processFlag == ConsumerInterface::MSG_ACK_SENT) {
224+
// do nothing, ACK should be already sent
222225
} else {
223226
// Remove message from queue only if callback return not false
224227
$this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);

RabbitMq/Consumer.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $ca
158158
)
159159
));
160160
} catch (Exception\StopConsumerException $e) {
161-
$this->logger->info('Consumer requested restart', array(
161+
$this->logger->info('Consumer requested stop', array(
162162
'amqp' => array(
163163
'queue' => $queueName,
164164
'message' => $msg,
@@ -198,13 +198,13 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
198198
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199199
// Reject and requeue message to RabbitMQ
200200
$msg->reject();
201-
} else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
201+
} elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202202
// NACK and requeue message to RabbitMQ
203203
$msg->nack(true);
204-
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
204+
} elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
205205
// Reject and drop
206206
$msg->reject(false);
207-
} else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
207+
} elseif ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208208
// Remove message from queue only if callback return not false
209209
$msg->ack();
210210
}

0 commit comments

Comments
 (0)