@@ -134,7 +134,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
134
134
$ this ->consumed ++;
135
135
$ this ->maybeStopConsumer ();
136
136
if (!$ isRejectedOrReQueued ) {
137
- $ this ->addDeliveryTag ($ msg );
137
+ $ this ->addMessage ($ msg );
138
138
}
139
139
140
140
if (!is_null ($ this ->getMemoryLimit ()) && $ this ->isRamAlmostOverloaded ()) {
@@ -150,12 +150,12 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
150
150
private function resetBatch ($ hasExceptions = false )
151
151
{
152
152
if ($ hasExceptions ) {
153
- array_map (function (AMQPMessage $ msg ) {
154
- $ msg -> delivery_info ['channel ' ]->basic_reject ($ msg -> delivery_info [ ' delivery_tag ' ], true );
153
+ array_map (function ($ message ) {
154
+ $ message ['channel ' ]->basic_reject ($ message [ ' tag ' ], true );
155
155
}, $ this ->messages );
156
156
} else {
157
- array_map (function (AMQPMessage $ msg ) {
158
- $ msg -> delivery_info ['channel ' ]->basic_ack ($ msg -> delivery_info [ ' delivery_tag ' ]);
157
+ array_map (function ($ message ) {
158
+ $ message ['channel ' ]->basic_ack ($ message [ ' tag ' ]);
159
159
}, $ this ->messages );
160
160
}
161
161
@@ -168,9 +168,12 @@ private function resetBatch($hasExceptions = false)
168
168
*
169
169
* @return void
170
170
*/
171
- private function addDeliveryTag (AMQPMessage $ message )
171
+ private function addMessage (AMQPMessage $ message )
172
172
{
173
- $ this ->messages [$ this ->batchCounter ++] = $ message ;
173
+ $ this ->messages [$ this ->batchCounter ++] = array (
174
+ 'channel ' => $ message ->delivery_info ['channel ' ],
175
+ 'tag ' => $ message ->delivery_info ['delivery_tag ' ],
176
+ );
174
177
}
175
178
176
179
/**
0 commit comments