File tree Expand file tree Collapse file tree 4 files changed +27
-0
lines changed Expand file tree Collapse file tree 4 files changed +27
-0
lines changed Original file line number Diff line number Diff line change @@ -272,6 +272,7 @@ protected function addBatchConsumers(ArrayNodeDefinition $node)
272
272
->scalarNode ('idle_timeout ' )->end ()
273
273
->scalarNode ('timeout_wait ' )->defaultValue (3 )->end ()
274
274
->scalarNode ('idle_timeout_exit_code ' )->end ()
275
+ ->scalarNode ('keep_alive ' )->defaultFalse ()->end ()
275
276
->scalarNode ('auto_setup_fabric ' )->defaultTrue ()->end ()
276
277
->arrayNode ('qos_options ' )
277
278
->children ()
Original file line number Diff line number Diff line change @@ -418,6 +418,10 @@ protected function loadBatchConsumers()
418
418
$ definition ->addMethodCall ('disableAutoSetupFabric ' );
419
419
}
420
420
421
+ if ($ consumer ['keep_alive ' ]) {
422
+ $ definition ->addMethodCall ('keepAlive ' );
423
+ }
424
+
421
425
$ this ->injectConnection ($ definition , $ consumer ['connection ' ]);
422
426
if ($ this ->collectorEnabled ) {
423
427
$ this ->injectLoggedChannel ($ definition , $ key , $ consumer ['connection ' ]);
Original file line number Diff line number Diff line change @@ -810,8 +810,11 @@ batch_consumers:
810
810
timeout_wait: 5
811
811
auto_setup_fabric: false
812
812
idle_timeout_exit_code: -2
813
+ keep_alive: false
813
814
` ` `
814
815
816
+ *Note*: If the `keep_alive` option is set to `true`, `idle_timeout_exit_code` will be ignored and the consumer process continues.
817
+
815
818
You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknoledge.
816
819
817
820
` ` ` php
Original file line number Diff line number Diff line change @@ -24,6 +24,11 @@ final class BatchConsumer extends BaseAmqp implements DequeuerInterface
24
24
*/
25
25
private $ idleTimeout = 0 ;
26
26
27
+ /**
28
+ * @var bool
29
+ */
30
+ private $ keepAlive = false ;
31
+
27
32
/**
28
33
* @var int
29
34
*/
@@ -84,6 +89,8 @@ public function consume()
84
89
} catch (AMQPTimeoutException $ e ) {
85
90
if (!$ this ->isEmptyBatch ()) {
86
91
$ this ->batchConsume ();
92
+ } elseif ($ this ->keepAlive === true ) {
93
+ continue ;
87
94
} elseif (null !== $ this ->getIdleTimeoutExitCode ()) {
88
95
return $ this ->getIdleTimeoutExitCode ();
89
96
} else {
@@ -399,6 +406,18 @@ public function setIdleTimeoutExitCode($idleTimeoutExitCode)
399
406
return $ this ;
400
407
}
401
408
409
+ /**
410
+ * keepAlive
411
+ *
412
+ * @return $this
413
+ */
414
+ public function keepAlive ()
415
+ {
416
+ $ this ->keepAlive = true ;
417
+
418
+ return $ this ;
419
+ }
420
+
402
421
/**
403
422
* Purge the queue
404
423
*/
You can’t perform that action at this time.
0 commit comments