Skip to content

Commit a3d4286

Browse files
[11.x] Improve performance of Redis queue block_for when a worker has multiple queues to service (#52826)
* Improve performance of Redis queue block_for when a worker has multiple queues to service * Default block on popJobCallback * Fix style * Fix style again * fix: Encapsulate blocking determination in RedisQueue * fix: Style
1 parent 2599bc4 commit a3d4286

File tree

2 files changed

+25
-6
lines changed

2 files changed

+25
-6
lines changed

src/Illuminate/Queue/RedisQueue.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ class RedisQueue extends Queue implements QueueContract, ClearableQueue
5454
*/
5555
protected $migrationBatchSize = -1;
5656

57+
/**
58+
* Indicates if a secondary queue had a job available between checks of the primary queue.
59+
*
60+
* Only applicable when monitoring multiple named queues with a single instance.
61+
*
62+
* @var bool
63+
*/
64+
protected $secondaryQueueHadJob = false;
65+
5766
/**
5867
* Create a new Redis queue instance.
5968
*
@@ -221,13 +230,23 @@ protected function createPayloadArray($job, $queue, $data = '')
221230
* @param string|null $queue
222231
* @return \Illuminate\Contracts\Queue\Job|null
223232
*/
224-
public function pop($queue = null)
233+
public function pop($queue = null, $index = 0)
225234
{
226235
$this->migrate($prefixed = $this->getQueue($queue));
227236

228-
[$job, $reserved] = $this->retrieveNextJob($prefixed);
237+
$block = ! $this->secondaryQueueHadJob && $index == 0;
238+
239+
[$job, $reserved] = $this->retrieveNextJob($prefixed, $block);
240+
241+
if ($index == 0) {
242+
$this->secondaryQueueHadJob = false;
243+
}
229244

230245
if ($reserved) {
246+
if ($index > 0) {
247+
$this->secondaryQueueHadJob = true;
248+
}
249+
231250
return new RedisJob(
232251
$this->container, $this, $job,
233252
$reserved, $this->connectionName, $queue ?: $this->default

src/Illuminate/Queue/Worker.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,8 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
346346
*/
347347
protected function getNextJob($connection, $queue)
348348
{
349-
$popJobCallback = function ($queue) use ($connection) {
350-
return $connection->pop($queue);
349+
$popJobCallback = function ($queue, $index = 0) use ($connection) {
350+
return $connection->pop($queue, $index);
351351
};
352352

353353
$this->raiseBeforeJobPopEvent($connection->getConnectionName());
@@ -360,8 +360,8 @@ protected function getNextJob($connection, $queue)
360360
);
361361
}
362362

363-
foreach (explode(',', $queue) as $queue) {
364-
if (! is_null($job = $popJobCallback($queue))) {
363+
foreach (explode(',', $queue) as $index => $queue) {
364+
if (! is_null($job = $popJobCallback($queue, $index))) {
365365
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);
366366

367367
return $job;

0 commit comments

Comments
 (0)