Skip to content

Commit 51a0eba

Browse files
committed
[11.x] Fix integrity constraint violation on failed_jobs_uuid_unique (#53230)
1 parent 5a9886c commit 51a0eba

File tree

8 files changed

+135
-43
lines changed

8 files changed

+135
-43
lines changed

src/Illuminate/Queue/Console/WorkCommand.php

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@
1313
use Illuminate\Queue\WorkerOptions;
1414
use Illuminate\Support\Carbon;
1515
use Illuminate\Support\InteractsWithTime;
16+
use SplObserver;
17+
use SplSubject;
1618
use Symfony\Component\Console\Attribute\AsCommand;
1719
use Symfony\Component\Console\Terminal;
1820
use Throwable;
1921

2022
use function Termwind\terminal;
2123

2224
#[AsCommand(name: 'queue:work')]
23-
class WorkCommand extends Command
25+
class WorkCommand extends Command implements SplObserver
2426
{
2527
use InteractsWithTime;
2628

@@ -126,6 +128,27 @@ public function handle()
126128
);
127129
}
128130

131+
/**
132+
* Receive update from subjects.
133+
*
134+
* @param SplSubject $subject
135+
* @return void
136+
*/
137+
public function update(SplSubject $subject): void
138+
{
139+
if ($subject instanceof JobProcessing) {
140+
$this->writeOutput($subject->job, 'starting');
141+
} elseif ($subject instanceof JobProcessed) {
142+
$this->writeOutput($subject->job, 'success');
143+
} elseif ($subject instanceof JobReleasedAfterException) {
144+
$this->writeOutput($subject->job, 'released_after_exception');
145+
} elseif ($subject instanceof JobFailed) {
146+
$this->writeOutput($subject->job, 'failed', $subject->exception);
147+
148+
$this->logFailedJob($subject);
149+
}
150+
}
151+
129152
/**
130153
* Run the worker instance.
131154
*
@@ -136,6 +159,7 @@ public function handle()
136159
protected function runWorker($connection, $queue)
137160
{
138161
return $this->worker
162+
->setObserver($this)
139163
->setName($this->option('name'))
140164
->setCache($this->cache)
141165
->{$this->option('once') ? 'runNextJob' : 'daemon'}(
@@ -165,32 +189,6 @@ protected function gatherWorkerOptions()
165189
);
166190
}
167191

168-
/**
169-
* Listen for the queue events in order to update the console output.
170-
*
171-
* @return void
172-
*/
173-
protected function listenForEvents()
174-
{
175-
$this->laravel['events']->listen(JobProcessing::class, function ($event) {
176-
$this->writeOutput($event->job, 'starting');
177-
});
178-
179-
$this->laravel['events']->listen(JobProcessed::class, function ($event) {
180-
$this->writeOutput($event->job, 'success');
181-
});
182-
183-
$this->laravel['events']->listen(JobReleasedAfterException::class, function ($event) {
184-
$this->writeOutput($event->job, 'released_after_exception');
185-
});
186-
187-
$this->laravel['events']->listen(JobFailed::class, function ($event) {
188-
$this->writeOutput($event->job, 'failed', $event->exception);
189-
190-
$this->logFailedJob($event);
191-
});
192-
}
193-
194192
/**
195193
* Write the status output for the queue worker for JSON or TTY.
196194
*

src/Illuminate/Queue/Events/JobFailed.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Illuminate\Queue\Events;
44

5-
class JobFailed
5+
class JobFailed extends QueueEvent
66
{
77
/**
88
* The connection name.
@@ -38,5 +38,6 @@ public function __construct($connectionName, $job, $exception)
3838
$this->job = $job;
3939
$this->exception = $exception;
4040
$this->connectionName = $connectionName;
41+
parent::__construct();
4142
}
4243
}

src/Illuminate/Queue/Events/JobProcessed.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Illuminate\Queue\Events;
44

5-
class JobProcessed
5+
class JobProcessed extends QueueEvent
66
{
77
/**
88
* The connection name.
@@ -29,5 +29,6 @@ public function __construct($connectionName, $job)
2929
{
3030
$this->job = $job;
3131
$this->connectionName = $connectionName;
32+
parent::__construct();
3233
}
3334
}

src/Illuminate/Queue/Events/JobProcessing.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Illuminate\Queue\Events;
44

5-
class JobProcessing
5+
class JobProcessing extends QueueEvent
66
{
77
/**
88
* The connection name.
@@ -29,5 +29,6 @@ public function __construct($connectionName, $job)
2929
{
3030
$this->job = $job;
3131
$this->connectionName = $connectionName;
32+
parent::__construct();
3233
}
3334
}

src/Illuminate/Queue/Events/JobReleasedAfterException.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Illuminate\Queue\Events;
44

5-
class JobReleasedAfterException
5+
class JobReleasedAfterException extends QueueEvent
66
{
77
/**
88
* The connection name.
@@ -29,5 +29,6 @@ public function __construct($connectionName, $job)
2929
{
3030
$this->job = $job;
3131
$this->connectionName = $connectionName;
32+
parent::__construct();
3233
}
3334
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
namespace Illuminate\Queue\Events;
4+
5+
use SplObjectStorage;
6+
use SplObserver;
7+
use SplSubject;
8+
9+
abstract class QueueEvent implements SplSubject
10+
{
11+
protected SplObjectStorage $observers;
12+
13+
public function __construct()
14+
{
15+
$this->observers = new SplObjectStorage;
16+
}
17+
18+
public function attach(SplObserver $observer): void
19+
{
20+
$this->observers->attach($observer);
21+
}
22+
23+
public function detach(SplObserver $observer): void
24+
{
25+
$this->observers->detach($observer);
26+
}
27+
28+
public function notify(): void
29+
{
30+
foreach ($this->observers as $observer) {
31+
$observer->update($this);
32+
}
33+
}
34+
}

src/Illuminate/Queue/Jobs/Job.php

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Illuminate\Queue\ManuallyFailedException;
1010
use Illuminate\Queue\TimeoutExceededException;
1111
use Illuminate\Support\InteractsWithTime;
12+
use SplObserver;
1213
use Throwable;
1314

1415
abstract class Job
@@ -64,6 +65,13 @@ abstract class Job
6465
*/
6566
protected $queue;
6667

68+
/**
69+
* Event observer.
70+
*
71+
* @var SplObserver|null
72+
*/
73+
protected $observer;
74+
6775
/**
6876
* Get the job identifier.
6977
*
@@ -212,9 +220,12 @@ public function fail($e = null)
212220

213221
$this->failed($e);
214222
} finally {
215-
$this->resolve(Dispatcher::class)->dispatch(new JobFailed(
216-
$this->connectionName, $this, $e ?: new ManuallyFailedException
217-
));
223+
$event = new JobFailed($this->connectionName, $this, $e ?: new ManuallyFailedException);
224+
if ($this->observer) {
225+
$event->attach($this->observer);
226+
}
227+
228+
$this->resolve(Dispatcher::class)->dispatch($event);
218229
}
219230
}
220231

@@ -377,4 +388,15 @@ public function getContainer()
377388
{
378389
return $this->container;
379390
}
391+
392+
/**
393+
* Set the event observer.
394+
*
395+
* @param SplObserver $observer
396+
* @return void
397+
*/
398+
public function setObserver(SplObserver $observer)
399+
{
400+
$this->observer = $observer;
401+
}
380402
}

src/Illuminate/Queue/Worker.php

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Illuminate\Queue\Events\Looping;
1919
use Illuminate\Queue\Events\WorkerStopping;
2020
use Illuminate\Support\Carbon;
21+
use SplObserver;
2122
use Throwable;
2223

2324
class Worker
@@ -98,6 +99,13 @@ class Worker
9899
*/
99100
protected static $popCallbacks = [];
100101

102+
/**
103+
* Attach observers to this handler to subscribe to messages.
104+
*
105+
* @var SplObserver|null
106+
*/
107+
protected $observer;
108+
101109
/**
102110
* Create a new queue worker.
103111
*
@@ -488,9 +496,12 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
488496
if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
489497
$job->release($this->calculateBackoff($job, $options));
490498

491-
$this->events->dispatch(new JobReleasedAfterException(
492-
$connectionName, $job
493-
));
499+
$event = new JobReleasedAfterException($connectionName, $job);
500+
if ($this->observer) {
501+
$event->attach($this->observer);
502+
}
503+
504+
$this->events->dispatch($event);
494505
}
495506
}
496507

@@ -600,6 +611,10 @@ protected function markJobAsFailedIfItShouldFailOnTimeout($connectionName, $job,
600611
*/
601612
protected function failJob($job, Throwable $e)
602613
{
614+
if ($this->observer && method_exists($job, 'setObserver')) {
615+
$job->setObserver($this->observer);
616+
}
617+
603618
$job->fail($e);
604619
}
605620

@@ -656,9 +671,12 @@ protected function raiseAfterJobPopEvent($connectionName, $job)
656671
*/
657672
protected function raiseBeforeJobEvent($connectionName, $job)
658673
{
659-
$this->events->dispatch(new JobProcessing(
660-
$connectionName, $job
661-
));
674+
$jobProcessing = new JobProcessing($connectionName, $job);
675+
if ($this->observer) {
676+
$jobProcessing->attach($this->observer);
677+
}
678+
679+
$this->events->dispatch($jobProcessing);
662680
}
663681

664682
/**
@@ -670,9 +688,12 @@ protected function raiseBeforeJobEvent($connectionName, $job)
670688
*/
671689
protected function raiseAfterJobEvent($connectionName, $job)
672690
{
673-
$this->events->dispatch(new JobProcessed(
674-
$connectionName, $job
675-
));
691+
$event = new JobProcessed($connectionName, $job);
692+
if ($this->observer) {
693+
$event->attach($this->observer);
694+
}
695+
696+
$this->events->dispatch($event);
676697
}
677698

678699
/**
@@ -880,4 +901,17 @@ public function setManager(QueueManager $manager)
880901
{
881902
$this->manager = $manager;
882903
}
904+
905+
/**
906+
* Attach an observer.
907+
*
908+
* @param SplObserver $observer
909+
* @return $this
910+
*/
911+
public function setObserver(SplObserver $observer)
912+
{
913+
$this->observer = $observer;
914+
915+
return $this;
916+
}
883917
}

0 commit comments

Comments
 (0)