Skip to content

Commit 5bdbd0f

Browse files
committed
do not use deprecated AMQPMessager->delivery_info
1 parent 3e304d9 commit 5bdbd0f

File tree

7 files changed

+31
-37
lines changed

7 files changed

+31
-37
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,7 @@ class DevckBasicConsumer implements BatchConsumerInterface
922922
$result = [];
923923
/** @var AMQPMessage $message */
924924
foreach ($messages as $message) {
925-
$result[(int)$message->delivery_info['delivery_tag']] = $this->executeSomeLogicPerMessage($message);
925+
$result[$message->getDeliveryTag()] = $this->executeSomeLogicPerMessage($message);
926926
}
927927
928928
// you ack only some messages that have return true

RabbitMq/BatchConsumer.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ private function resetBatch()
300300
private function addMessage(AMQPMessage $message)
301301
{
302302
$this->batchCounter++;
303-
$this->messages[(int)$message->delivery_info['delivery_tag']] = $message;
303+
$this->messages[$message->getDeliveryTag()] = $message;
304304
}
305305

306306
/**
@@ -330,7 +330,7 @@ private function getMessageChannel($deliveryTag)
330330
throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
331331
}
332332

333-
return $message->delivery_info['channel'];
333+
return $message->getChannel();
334334
}
335335

336336
/**

RabbitMq/Consumer.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,16 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197197
{
198198
if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199199
// Reject and requeue message to RabbitMQ
200-
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
200+
$msg->reject();
201201
} else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202202
// NACK and requeue message to RabbitMQ
203-
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
203+
$msg->nack(true);
204204
} else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205205
// Reject and drop
206-
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
206+
$msg->reject(false);
207207
} else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208208
// Remove message from queue only if callback return not false
209-
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
209+
$msg->ack();
210210
}
211211

212212
$this->consumed++;

RabbitMq/RpcServer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public function initServer($name)
1717
public function processMessage(AMQPMessage $msg)
1818
{
1919
try {
20-
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
20+
$msg->ack();
2121
$result = call_user_func($this->callback, $msg);
2222
$result = call_user_func($this->serializer, $result);
2323
$this->sendReply($result, $msg->get('reply_to'), $msg->get('correlation_id'));

Tests/RabbitMq/ConsumerTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public function testProcessMessage($processFlag, $expectedMethod = null, $expect
5656

5757
// Create a default message
5858
$amqpMessage = new AMQPMessage('foo body');
59-
$amqpMessage->delivery_info['channel'] = $amqpChannel;
60-
$amqpMessage->delivery_info['delivery_tag'] = 0;
59+
$amqpMessage->setChannel($amqpChannel);
60+
$amqpMessage->setDeliveryTag(0);
6161

6262
if ($expectedMethod) {
6363
$amqpChannel->expects($this->any())

Tests/RabbitMq/MultipleConsumerTest.php

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,8 @@ public function testProcessMessage($processFlag, $expectedMethod, $expectedReque
6565

6666
$this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
6767

68-
// Create a default message
69-
$amqpMessage = new AMQPMessage('foo body');
70-
$amqpMessage->delivery_info['channel'] = $this->amqpChannel;
71-
$amqpMessage->delivery_info['delivery_tag'] = 0;
72-
73-
$this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
74-
$this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
68+
$this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
69+
$this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
7570
}
7671

7772
/**
@@ -106,13 +101,8 @@ public function testQueuesProvider($processFlag, $expectedMethod, $expectedReque
106101

107102
$this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
108103

109-
// Create a default message
110-
$amqpMessage = new AMQPMessage('foo body');
111-
$amqpMessage->delivery_info['channel'] = $this->amqpChannel;
112-
$amqpMessage->delivery_info['delivery_tag'] = 0;
113-
114-
$this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
115-
$this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
104+
$this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
105+
$this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
116106
}
117107

118108
/**
@@ -154,15 +144,10 @@ public function testQueuesProviderAndStaticQueuesTogether($processFlag, $expecte
154144

155145
$this->prepareAMQPChannelExpectations($expectedMethod, $expectedRequeue);
156146

157-
// Create a default message
158-
$amqpMessage = new AMQPMessage('foo body');
159-
$amqpMessage->delivery_info['channel'] = $this->amqpChannel;
160-
$amqpMessage->delivery_info['delivery_tag'] = 0;
161-
162-
$this->multipleConsumer->processQueueMessage('test-1', $amqpMessage);
163-
$this->multipleConsumer->processQueueMessage('test-2', $amqpMessage);
164-
$this->multipleConsumer->processQueueMessage('test-3', $amqpMessage);
165-
$this->multipleConsumer->processQueueMessage('test-4', $amqpMessage);
147+
$this->multipleConsumer->processQueueMessage('test-1', $this->createMessage());
148+
$this->multipleConsumer->processQueueMessage('test-2', $this->createMessage());
149+
$this->multipleConsumer->processQueueMessage('test-3', $this->createMessage());
150+
$this->multipleConsumer->processQueueMessage('test-4', $this->createMessage());
166151
}
167152

168153
public function processMessageProvider()
@@ -300,4 +285,13 @@ private function prepareCallback($processFlag)
300285
return $processFlag;
301286
};
302287
}
288+
289+
private function createMessage()
290+
{
291+
$amqpMessage = new AMQPMessage('foo body');
292+
$amqpMessage->setChannel($this->amqpChannel);
293+
$amqpMessage->setDeliveryTag(0);
294+
295+
return $amqpMessage;
296+
}
303297
}

Tests/RabbitMq/RpcServerTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ public function testProcessMessageWithCustomSerializer()
1818
$message = $this->getMockBuilder('\PhpAmqpLib\Message\AMQPMessage')
1919
->setMethods( array('get'))
2020
->getMock();
21-
$message->delivery_info = array(
22-
'channel' => $this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
21+
$message->setChannel(
22+
$this->getMockBuilder('\PhpAmqpLib\Channel\AMQPChannel')
2323
->setMethods(array())->setConstructorArgs(array())
2424
->setMockClassName('')
2525
->disableOriginalConstructor()
26-
->getMock(),
27-
'delivery_tag' => null
26+
->getMock()
2827
);
28+
$message->setDeliveryTag(0);
2929
$server->setCallback(function() {
3030
return 'message';
3131
});

0 commit comments

Comments
 (0)