@@ -80,7 +80,7 @@ class Connection
80
80
private $ queuesOptions ;
81
81
private $ amqpFactory ;
82
82
private $ autoSetupExchange ;
83
- private $ autoSetup ;
83
+ private $ autoSetupDelayExchange ;
84
84
85
85
/**
86
86
* @var \AMQPChannel|null
@@ -114,7 +114,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
114
114
'queue_name_pattern ' => 'delay_%exchange_name%_%routing_key%_%delay% ' ,
115
115
],
116
116
], $ connectionOptions );
117
- $ this ->autoSetupExchange = $ this ->autoSetup = $ connectionOptions ['auto_setup ' ] ?? true ;
117
+ $ this ->autoSetupExchange = $ this ->autoSetupDelayExchange = $ connectionOptions ['auto_setup ' ] ?? true ;
118
118
$ this ->exchangeOptions = $ exchangeOptions ;
119
119
$ this ->queuesOptions = $ queuesOptions ;
120
120
$ this ->amqpFactory = $ amqpFactory ?? new AmqpFactory ();
@@ -288,16 +288,16 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, A
288
288
{
289
289
$ this ->clearWhenDisconnected ();
290
290
291
+ if ($ this ->autoSetupExchange ) {
292
+ $ this ->setupExchangeAndQueues (); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
293
+ }
294
+
291
295
if (0 !== $ delayInMs ) {
292
296
$ this ->publishWithDelay ($ body , $ headers , $ delayInMs , $ amqpStamp );
293
297
294
298
return ;
295
299
}
296
300
297
- if ($ this ->autoSetupExchange ) {
298
- $ this ->setupExchangeAndQueues ();
299
- }
300
-
301
301
$ this ->publishOnExchange (
302
302
$ this ->exchange (),
303
303
$ body ,
@@ -356,8 +356,8 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
356
356
357
357
private function setupDelay (int $ delay , ?string $ routingKey )
358
358
{
359
- if ($ this ->autoSetup ) {
360
- $ this ->setup (); // setup delay exchange and normal exchange for delay queue to DLX messages to
359
+ if ($ this ->autoSetupDelayExchange ) {
360
+ $ this ->setupDelayExchange ();
361
361
}
362
362
363
363
$ queue = $ this ->createDelayQueue ($ delay , $ routingKey );
@@ -450,11 +450,8 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AM
450
450
451
451
public function setup (): void
452
452
{
453
- if ($ this ->autoSetupExchange ) {
454
- $ this ->setupExchangeAndQueues ();
455
- }
456
- $ this ->getDelayExchange ()->declareExchange ();
457
- $ this ->autoSetup = false ;
453
+ $ this ->setupExchangeAndQueues ();
454
+ $ this ->setupDelayExchange ();
458
455
}
459
456
460
457
private function setupExchangeAndQueues (): void
@@ -470,6 +467,12 @@ private function setupExchangeAndQueues(): void
470
467
$ this ->autoSetupExchange = false ;
471
468
}
472
469
470
+ private function setupDelayExchange (): void
471
+ {
472
+ $ this ->getDelayExchange ()->declareExchange ();
473
+ $ this ->autoSetupDelayExchange = false ;
474
+ }
475
+
473
476
/**
474
477
* @return string[]
475
478
*/
0 commit comments