Skip to content

Commit bb67c2a

Browse files
committed
Fixes mongodb#743 for version 3.x
1 parent fd915e0 commit bb67c2a

File tree

1 file changed

+64
-17
lines changed

1 file changed

+64
-17
lines changed

src/Jenssegers/Mongodb/Queue/MongoQueue.php

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,31 +2,78 @@
22

33
use Carbon\Carbon;
44
use Illuminate\Queue\DatabaseQueue;
5+
use Illuminate\Queue\Jobs\DatabaseJob;
6+
use MongoDB\Operation\FindOneAndUpdate;
7+
use DB;
58

69
class MongoQueue extends DatabaseQueue
710
{
811
/**
9-
* Get the next available job for the queue.
12+
* Pop the next job off of the queue.
13+
*
14+
* @param string $queue
15+
*
16+
* @return \Illuminate\Contracts\Queue\Job|null
17+
*/
18+
public function pop($queue = null)
19+
{
20+
$queue = $this->getQueue($queue);
21+
22+
if (!is_null($this->expire))
23+
{
24+
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
25+
}
26+
27+
if ($job = $this->getNextAvailableJobAndReserve($queue))
28+
{
29+
return new DatabaseJob(
30+
$this->container, $this, $job, $queue
31+
);
32+
}
33+
}
34+
35+
/**
36+
* Get the next available job for the queue and mark it as reserved.
37+
*
38+
* When using multiple daemon queue listeners to process jobs there
39+
* is a possibility that multiple processes can end up reading the
40+
* same record before one has flagged it as reserved.
41+
*
42+
* This race condition can result in random jobs being run more then
43+
* once. To solve this we use findOneAndUpdate to lock the next jobs
44+
* record while flagging it as reserved at the same time.
45+
*
46+
* @param string|null $queue
1047
*
11-
* @param string|null $queue
1248
* @return \StdClass|null
1349
*/
14-
protected function getNextAvailableJob($queue)
50+
protected function getNextAvailableJobAndReserve($queue)
1551
{
16-
$job = $this->database->table($this->table)
17-
->lockForUpdate()
18-
->where('queue', $this->getQueue($queue))
19-
->where('reserved', 0)
20-
->where('available_at', '<=', $this->getTime())
21-
->orderBy('id', 'asc')
22-
->first();
52+
$job = DB::getCollection($this->table)->findOneAndUpdate(
53+
[
54+
'queue' => $this->getQueue($queue),
55+
'reserved' => 0,
56+
'available_at' => ['$lte' => $this->getTime()],
2357

24-
if ($job) {
25-
$job = (object) $job;
58+
],
59+
[
60+
'$set' => [
61+
'reserved' => 1,
62+
'reserved_at' => $this->getTime(),
63+
],
64+
],
65+
[
66+
'returnNewDocument ' => true,
67+
'sort' => ['available_at' => 1],
68+
]
69+
);
70+
71+
if ($job)
72+
{
2673
$job->id = $job->_id;
2774
}
2875

29-
return $job ?: null;
76+
return $job;
3077
}
3178

3279
/**
@@ -40,16 +87,16 @@ protected function releaseJobsThatHaveBeenReservedTooLong($queue)
4087
$expired = Carbon::now()->subSeconds($this->expire)->getTimestamp();
4188

4289
$reserved = $this->database->collection($this->table)
43-
->where('queue', $this->getQueue($queue))
44-
->where('reserved', 1)
45-
->where('reserved_at', '<=', $expired)->get();
90+
->where('queue', $this->getQueue($queue))
91+
->where('reserved', 1)
92+
->where('reserved_at', '<=', $expired)->get();
4693

4794
foreach ($reserved as $job) {
4895
$attempts = $job['attempts'] + 1;
4996
$this->releaseJob($job['_id'], $attempts);
5097
}
5198
}
52-
99+
53100
/**
54101
* Release the given job ID from reservation.
55102
*

0 commit comments

Comments
 (0)