Skip to content

Commit 858a1fe

Browse files
committed
Ensure multiple consumer follows same logic as normal consumer
1 parent 9255b76 commit 858a1fe

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

RabbitMq/Consumer.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,29 +71,29 @@ public function delete()
7171
$this->getChannel()->queue_delete($this->queueOptions['name'], true);
7272
}
7373

74-
public function processMessage(AMQPMessage $msg)
74+
protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
7575
{
7676
$this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
7777
new BeforeProcessingMessageEvent($this, $msg)
7878
);
7979
try {
80-
$processFlag = call_user_func($this->callback, $msg);
80+
$processFlag = call_user_func($callback, $msg);
8181
$this->handleProcessMessage($msg, $processFlag);
8282
$this->dispatchEvent(
8383
AfterProcessingMessageEvent::NAME,
8484
new AfterProcessingMessageEvent($this, $msg)
8585
);
8686
$this->logger->debug('Queue message processed', array(
8787
'amqp' => array(
88-
'queue' => $this->queueOptions['name'],
88+
'queue' => $queueName,
8989
'message' => $msg,
9090
'return_code' => $processFlag
9191
)
9292
));
9393
} catch (Exception\StopConsumerException $e) {
9494
$this->logger->info('Consumer requested restart', array(
9595
'amqp' => array(
96-
'queue' => $this->queueOptions['name'],
96+
'queue' => $queueName,
9797
'message' => $msg,
9898
'stacktrace' => $e->getTraceAsString()
9999
)
@@ -102,7 +102,7 @@ public function processMessage(AMQPMessage $msg)
102102
} catch (\Exception $e) {
103103
$this->logger->error($e->getMessage(), array(
104104
'amqp' => array(
105-
'queue' => $this->queueOptions['name'],
105+
'queue' => $queueName,
106106
'message' => $msg,
107107
'stacktrace' => $e->getTraceAsString()
108108
)
@@ -111,14 +111,18 @@ public function processMessage(AMQPMessage $msg)
111111
} catch (\Error $e) {
112112
$this->logger->error($e->getMessage(), array(
113113
'amqp' => array(
114-
'queue' => $this->queueOptions['name'],
114+
'queue' => $queueName,
115115
'message' => $msg,
116116
'stacktrace' => $e->getTraceAsString()
117117
)
118118
));
119119
throw $e;
120120
}
121+
}
121122

123+
public function processMessage(AMQPMessage $msg)
124+
{
125+
$this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
122126
}
123127

124128
protected function handleProcessMessage(AMQPMessage $msg, $processFlag)

RabbitMq/MultipleConsumer.php

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ public function processQueueMessage($queueName, AMQPMessage $msg)
8484
throw new QueueNotFoundException();
8585
}
8686

87-
$processFlag = call_user_func($this->queues[$queueName]['callback'], $msg);
88-
89-
$this->handleProcessMessage($msg, $processFlag);
87+
$this->processMessageQueueCallback($msg, $queueName, $this->queues[$queueName]['callback']);
9088
}
9189

9290
public function stopConsuming()

0 commit comments

Comments
 (0)