Skip to content

Commit e97daad

Browse files
authored
Merge pull request #12031 from rabbitmq/mergify/bp/v4.0.x/pr-11937
QQ: introduce a default delivery limit (backport #11937)
2 parents f664099 + cbe8461 commit e97daad

File tree

5 files changed

+49
-11
lines changed

5 files changed

+49
-11
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -850,10 +850,9 @@ overview(#?STATE{consumers = Cons,
850850
#{}
851851
end,
852852
MsgsRet = lqueue:len(Returns),
853-
854-
#{len := _MsgsLen,
855-
num_hi := MsgsHi,
853+
#{num_hi := MsgsHi,
856854
num_lo := MsgsLo} = rabbit_fifo_q:overview(Messages),
855+
857856
Overview = #{type => ?STATE,
858857
config => Conf,
859858
num_consumers => map_size(Cons),

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@
103103
-define(RA_SYSTEM, quorum_queues).
104104
-define(RA_WAL_NAME, ra_log_wal).
105105

106+
-define(DEFAULT_DELIVERY_LIMIT, 20).
107+
106108
-define(INFO(Str, Args),
107109
rabbit_log:info("[~s:~s/~b] " Str,
108110
[?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).
@@ -320,7 +322,14 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
320322
OverflowBin = args_policy_lookup(<<"overflow">>, fun policyHasPrecedence/2, Q),
321323
Overflow = overflow(OverflowBin, drop_head, QName),
322324
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
323-
DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
325+
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun min/2, Q) of
326+
undefined ->
327+
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
328+
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
329+
?DEFAULT_DELIVERY_LIMIT;
330+
DL ->
331+
DL
332+
end,
324333
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
325334
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
326335
#{name => Name,
@@ -508,11 +517,12 @@ spawn_notify_decorators(QName, Fun, Args) ->
508517
catch notify_decorators(QName, Fun, Args).
509518

510519
handle_tick(QName,
511-
#{config := #{name := Name},
520+
#{config := #{name := Name} = Cfg,
512521
num_active_consumers := NumConsumers,
513522
num_checked_out := NumCheckedOut,
514523
num_ready_messages := NumReadyMsgs,
515524
num_messages := NumMessages,
525+
num_enqueuers := NumEnqueuers,
516526
enqueue_message_bytes := EnqueueBytes,
517527
checkout_message_bytes := CheckoutBytes,
518528
num_discarded := NumDiscarded,
@@ -559,6 +569,7 @@ handle_tick(QName,
559569
MsgBytesDiscarded = DiscardBytes + DiscardCheckoutBytes,
560570
MsgBytes = EnqueueBytes + CheckoutBytes + MsgBytesDiscarded,
561571
Infos = [{consumers, NumConsumers},
572+
{publishers, NumEnqueuers},
562573
{consumer_capacity, Util},
563574
{consumer_utilisation, Util},
564575
{messages, NumMessages},
@@ -573,7 +584,14 @@ handle_tick(QName,
573584
{message_bytes_dlx, MsgBytesDiscarded},
574585
{single_active_consumer_tag, SacTag},
575586
{single_active_consumer_pid, SacPid},
576-
{leader, node()}
587+
{leader, node()},
588+
{delivery_limit, case maps:get(delivery_limit, Cfg,
589+
undefined) of
590+
undefined ->
591+
unlimited;
592+
Limit ->
593+
Limit
594+
end}
577595
| Infos0],
578596
rabbit_core_metrics:queue_stats(QName, Infos),
579597
ok = repair_leader_record(Q, Self),

deps/rabbitmq_management/priv/www/js/global.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,9 @@ var HELP = {
256256
'queue-dead-lettered':
257257
'Applies to messages dead-lettered with dead-letter-strategy <code>at-least-once</code>.',
258258

259+
'queue-delivery-limit':
260+
'The number of times a message can be returned to this queue before it is dead-lettered (if configured) or dropped.',
261+
259262
'queue-message-body-bytes':
260263
'<p>The sum total of the sizes of the message bodies in this queue. This only counts message bodies; it does not include message properties (including headers) or metadata used by the queue.</p><p>Note that "in memory" and "persistent" are not mutually exclusive; persistent messages can be in memory as well as on disc, and transient messages can be paged out if memory is tight. Non-durable queues will consider all messages to be transient.</p><p>If a message is routed to multiple queues on publication, its body will be stored only once (in memory and on disk) and shared between queues. The value shown here does not take account of this effect.</p>',
261264

deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,17 +92,29 @@
9292
<td><%= fmt_string(queue.consumer_details.length) %></td>
9393
</tr>
9494
<% } %>
95-
<% if (!is_stream(queue)) { %>
95+
<% if (is_classic(queue)) { %>
9696
<tr>
9797
<th>Consumer capacity <span class="help" id="queue-consumer-capacity"></th>
9898
<td><%= fmt_percent(queue.consumer_capacity) %></td>
9999
</tr>
100100
<% } %>
101+
<% if(queue.hasOwnProperty('publishers')) { %>
102+
<tr>
103+
<th>Publishers</th>
104+
<td><%= fmt_string(queue.publishers) %></td>
105+
</tr>
106+
<% } %>
101107
<% if (is_quorum(queue)) { %>
102108
<tr>
103109
<th>Open files</th>
104110
<td><%= fmt_table_short(queue.open_files) %></td>
105111
</tr>
112+
<% if (queue.hasOwnProperty('delivery_limit')) { %>
113+
<tr>
114+
<th>Delivery limit <span class="help" id="queue-delivery-limit"></th>
115+
<td><%= fmt_string(queue.delivery_limit) %></td>
116+
</tr>
117+
<% } %>
106118
<% } %>
107119
<% if (is_stream(queue)) { %>
108120
<tr>
@@ -187,20 +199,22 @@
187199
<td class="r">
188200
<%= fmt_bytes(queue.message_bytes_unacknowledged) %>
189201
</td>
190-
<td class="r">
191-
<%= fmt_bytes(queue.message_bytes_ram) %>
192-
</td>
193202
<% } %>
194203
<% if (is_quorum(queue)) { %>
195204
<td class="r">
196205
</td>
197206
<td class="r">
198207
</td>
208+
<td class="r">
209+
</td>
199210
<td class="r">
200211
<%= fmt_bytes(queue.message_bytes_dlx) %>
201212
</td>
202213
<% } %>
203214
<% if (is_classic(queue)) { %>
215+
<td class="r">
216+
<%= fmt_bytes(queue.message_bytes_ram) %>
217+
</td>
204218
<td class="r">
205219
<%= fmt_bytes(queue.message_bytes_persistent) %>
206220
</td>

release-notes/4.0.0.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ See Compatibility Notes below to learn about **breaking or potentially breaking
3333
* RabbitMQ 3.13 `rabbitmq.conf` setting `rabbitmq_amqp1_0.default_vhost` is unsupported in RabbitMQ 4.0.
3434
Instead `default_vhost` will be used to determine the default vhost an AMQP 1.0 client connects to(i.e. when the AMQP 1.0 client does not define the vhost in the `hostname` field of the `open` frame)
3535
* RabbitMQ Shovels will be able connect to a RabbitMQ 4.0 node via AMQP 1.0 only when the Shovel runs on a RabbitMQ node >= `3.13.7`
36+
* Quorum queues will now always set a default `delivery-limit` of 20 which can be increased or decreased by policies and queue arguments but cannot be unset. Some applications or configurations may need to be updated to handle this.
3637

3738
## Erlang/OTP Compatibility Notes
3839

@@ -83,8 +84,11 @@ periods of time (no more than a few hours).
8384

8485
### Recommended Post-upgrade Procedures
8586

86-
TBD
87+
Set a low priority dead lettering policy for all quorum queues to dead letter to a stream or similar
88+
so that messages that reach the new default delivery limit of 20 aren't lost completely
89+
when no dead lettering policy is in place.
8790

91+
TBD
8892

8993
## Changes Worth Mentioning
9094

0 commit comments

Comments
 (0)