@@ -64,14 +64,17 @@ protected function getNextAvailableJobAndReserve($queue)
64
64
$ job = $ this ->database ->getCollection ($ this ->table )->findOneAndUpdate (
65
65
[
66
66
'queue ' => $ this ->getQueue ($ queue ),
67
- 'reserved ' => 0 ,
67
+ 'reserved ' => [ ' $ne ' => 1 ] ,
68
68
'available_at ' => ['$lte ' => Carbon::now ()->getTimestamp ()],
69
69
],
70
70
[
71
71
'$set ' => [
72
72
'reserved ' => 1 ,
73
73
'reserved_at ' => Carbon::now ()->getTimestamp (),
74
74
],
75
+ '$inc ' => [
76
+ 'attempts ' => 1 ,
77
+ ],
75
78
],
76
79
[
77
80
'returnDocument ' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER ,
@@ -98,20 +101,12 @@ protected function releaseJobsThatHaveBeenReservedTooLong($queue)
98
101
99
102
$ reserved = $ this ->database ->collection ($ this ->table )
100
103
->where ('queue ' , $ this ->getQueue ($ queue ))
101
- ->where (function ($ query ) use ($ expiration , $ now ) {
102
- // Check for available jobs
103
- $ query ->where (function ($ query ) use ($ now ) {
104
- $ query ->whereNull ('reserved_at ' );
105
- $ query ->where ('available_at ' , '<= ' , $ now );
106
- });
107
-
108
- // Check for jobs that are reserved but have expired
109
- $ query ->orWhere ('reserved_at ' , '<= ' , $ expiration );
110
- })->get ();
104
+ ->whereNotNull ('reserved_at ' );
105
+ ->where ('reserved_at ' , '<= ' , $ expiration );
106
+ ->get ();
111
107
112
108
foreach ($ reserved as $ job ) {
113
- $ attempts = $ job ['attempts ' ] + 1 ;
114
- $ this ->releaseJob ($ job ['_id ' ], $ attempts );
109
+ $ this ->releaseJob ($ job ['_id ' ], $ job ['attempts ' ]);
115
110
}
116
111
}
117
112
0 commit comments