Skip to content

Commit 4b19360

Browse files
authored
Merge pull request #3121 from rabbitmq/quorum-queues-v2
QQ: introduce new machine version (2)
2 parents c842a6c + 4a2b00a commit 4b19360

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+8569
-2030
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,9 @@ _APP_ENV = """[
144144
%% interval at which connection/channel tracking executes post operations
145145
{tracking_execution_timeout, 15000},
146146
{stream_messages_soft_limit, 256},
147-
{track_auth_attempt_source, false}
147+
{track_auth_attempt_source, false},
148+
{dead_letter_worker_consumer_prefetch, 32},
149+
{dead_letter_worker_publisher_confirm_timeout, 180000}
148150
]
149151
"""
150152

@@ -401,7 +403,7 @@ suites = [
401403
":quorum_queue_utils",
402404
],
403405
flaky = True,
404-
shard_count = 3,
406+
shard_count = 7,
405407
),
406408
rabbitmq_integration_suite(
407409
PACKAGE,
@@ -698,7 +700,7 @@ suites = [
698700
),
699701
rabbitmq_suite(
700702
name = "rabbit_fifo_prop_SUITE",
701-
size = "medium",
703+
size = "large",
702704
additional_beam = [
703705
":test_util",
704706
],
@@ -716,6 +718,37 @@ suites = [
716718
"@proper//:erlang_app",
717719
],
718720
),
721+
rabbitmq_suite(
722+
name = "rabbit_fifo_dlx_SUITE",
723+
size = "small",
724+
additional_hdrs = [
725+
"src/rabbit_fifo.hrl",
726+
"src/rabbit_fifo_dlx.hrl",
727+
],
728+
deps = [
729+
"//deps/rabbit_common:erlang_app",
730+
],
731+
),
732+
rabbitmq_integration_suite(
733+
PACKAGE,
734+
name = "rabbit_fifo_dlx_integration_SUITE",
735+
size = "medium",
736+
additional_beam = [
737+
":test_util",
738+
":quorum_queue_utils",
739+
":quorum_queue_SUITE_beam_files",
740+
],
741+
additional_hdrs = [
742+
"src/rabbit_fifo.hrl",
743+
"src/rabbit_fifo_dlx.hrl",
744+
],
745+
runtime_deps = [
746+
"@ra//:erlang_app",
747+
],
748+
deps = [
749+
"@proper//:erlang_app",
750+
],
751+
),
719752
rabbitmq_suite(
720753
name = "rabbit_fifo_SUITE",
721754
size = "medium",

deps/rabbit/Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,9 @@ define PROJECT_ENV
122122
%% interval at which connection/channel tracking executes post operations
123123
{tracking_execution_timeout, 15000},
124124
{stream_messages_soft_limit, 256},
125-
{track_auth_attempt_source, false}
125+
{track_auth_attempt_source, false},
126+
{dead_letter_worker_consumer_prefetch, 32},
127+
{dead_letter_worker_publisher_confirm_timeout, 180000}
126128
]
127129
endef
128130

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,7 @@ declare_args() ->
778778
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
779779
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
780780
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
781+
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
781782
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
782783
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
783784
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
@@ -946,6 +947,22 @@ check_dlxrk_arg(Val, Args) when is_binary(Val) ->
946947
check_dlxrk_arg(_Val, _Args) ->
947948
{error, {unacceptable_type, "expected a string"}}.
948949

950+
-define(KNOWN_DLX_STRATEGIES, [<<"at-most-once">>, <<"at-least-once">>]).
951+
check_dlxstrategy_arg({longstr, Val}, _Args) ->
952+
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
953+
true -> ok;
954+
false -> {error, invalid_dlx_strategy}
955+
end;
956+
check_dlxstrategy_arg({Type, _}, _Args) ->
957+
{error, {unacceptable_type, Type}};
958+
check_dlxstrategy_arg(Val, _Args) when is_binary(Val) ->
959+
case lists:member(Val, ?KNOWN_DLX_STRATEGIES) of
960+
true -> ok;
961+
false -> {error, invalid_dlx_strategy}
962+
end;
963+
check_dlxstrategy_arg(_Val, _Args) ->
964+
{error, invalid_dlx_strategy}.
965+
949966
-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
950967
check_overflow({longstr, Val}, _Args) ->
951968
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
@@ -1503,7 +1520,8 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
15031520
[lists:zip(ConsumerInfoKeys,
15041521
[amqqueue:get_name(Q), ChPid, CTag,
15051522
AckRequired, Prefetch, Active, ActivityStatus, Args]) ||
1506-
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _} <- consumers(Q)].
1523+
{ChPid, CTag, AckRequired, Prefetch, Active, ActivityStatus, Args, _}
1524+
<- consumers(Q)].
15071525

15081526
-spec stat(amqqueue:amqqueue()) ->
15091527
{'ok', non_neg_integer(), non_neg_integer()}.
@@ -1657,8 +1675,8 @@ credit(Q, CTag, Credit, Drain, QStates) ->
16571675
{'ok', non_neg_integer(), qmsg(), rabbit_queue_type:state()} |
16581676
{'empty', rabbit_queue_type:state()} |
16591677
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
1660-
basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
1661-
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates0).
1678+
basic_get(Q, NoAck, LimiterPid, CTag, QStates) ->
1679+
rabbit_queue_type:dequeue(Q, NoAck, LimiterPid, CTag, QStates).
16621680

16631681

16641682
-spec basic_consume(amqqueue:amqqueue(), boolean(), pid(), pid(), boolean(),
@@ -1670,7 +1688,7 @@ basic_get(Q, NoAck, LimiterPid, CTag, QStates0) ->
16701688
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
16711689
basic_consume(Q, NoAck, ChPid, LimiterPid,
16721690
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
1673-
ExclusiveConsume, Args, OkMsg, ActingUser, Contexts) ->
1691+
ExclusiveConsume, Args, OkMsg, ActingUser, QStates) ->
16741692

16751693
QName = amqqueue:get_name(Q),
16761694
%% first phase argument validation
@@ -1686,7 +1704,7 @@ basic_consume(Q, NoAck, ChPid, LimiterPid,
16861704
args => Args,
16871705
ok_msg => OkMsg,
16881706
acting_user => ActingUser},
1689-
rabbit_queue_type:consume(Q, Spec, Contexts).
1707+
rabbit_queue_type:consume(Q, Spec, QStates).
16901708

16911709
-spec basic_cancel(amqqueue:amqqueue(), rabbit_types:ctag(), any(),
16921710
rabbit_types:username(),

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -728,10 +728,14 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
728728
with_dlx(
729729
DLX,
730730
fun (X) ->
731+
rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
732+
at_most_once, 1),
731733
QName = qname(State),
732734
rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
733735
end,
734-
fun () -> ok end),
736+
fun () -> rabbit_global_counters:messages_dead_lettered(maxlen, rabbit_classic_queue,
737+
disabled, 1)
738+
end),
735739
%% Drop publish and nack to publisher
736740
send_reject_publish(Delivery, Delivered, State);
737741
_ ->
@@ -763,6 +767,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
763767
{undelivered, State2 = #q{ttl = 0, dlx = undefined,
764768
backing_queue_state = BQS,
765769
msg_id_to_channel = MTC}} ->
770+
rabbit_global_counters:messages_dead_lettered(expired, rabbit_classic_queue,
771+
disabled, 1),
766772
{BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC, amqqueue:get_name(Q)),
767773
State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1};
768774
{undelivered, State2 = #q{backing_queue_state = BQS}} ->
@@ -804,6 +810,9 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
804810
State#q.dlx,
805811
fun (X) -> dead_letter_maxlen_msg(X, State) end,
806812
fun () ->
813+
rabbit_global_counters:messages_dead_lettered(maxlen,
814+
rabbit_classic_queue,
815+
disabled, 1),
807816
{_, BQS1} = BQ:drop(false, BQS),
808817
State#q{backing_queue_state = BQS1}
809818
end));
@@ -1012,11 +1021,18 @@ drop_expired_msgs(State) ->
10121021
drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
10131022
backing_queue = BQ }) ->
10141023
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
1024+
ExpirePredIncrement = fun(Properties) ->
1025+
ExpirePred(Properties) andalso
1026+
rabbit_global_counters:messages_dead_lettered(expired,
1027+
rabbit_classic_queue,
1028+
disabled,
1029+
1) =:= ok
1030+
end,
10151031
{Props, State1} =
10161032
with_dlx(
10171033
State#q.dlx,
10181034
fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
1019-
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
1035+
fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePredIncrement, BQS),
10201036
{Next, State#q{backing_queue_state = BQS1}} end),
10211037
ensure_ttl_timer(case Props of
10221038
undefined -> undefined;
@@ -1058,6 +1074,8 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
10581074
QName = qname(State),
10591075
{Res, Acks1, BQS1} =
10601076
Fun(fun (Msg, AckTag, Acks) ->
1077+
rabbit_global_counters:messages_dead_lettered(Reason, rabbit_classic_queue,
1078+
at_most_once, 1),
10611079
rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
10621080
[AckTag | Acks]
10631081
end, [], BQS),
@@ -1575,7 +1593,9 @@ handle_cast({reject, false, AckTags, ChPid}, State) ->
15751593
dead_letter_rejected_msgs(
15761594
AckTags, X, State1)
15771595
end) end,
1578-
fun () -> ack(AckTags, ChPid, State) end));
1596+
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
1597+
disabled, length(AckTags)),
1598+
ack(AckTags, ChPid, State) end));
15791599

15801600
handle_cast({delete_exclusive, ConnPid}, State) ->
15811601
log_delete_exclusive(ConnPid, State),

deps/rabbit/src/rabbit_channel.erl

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2165,22 +2165,18 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
21652165
confirm = Confirm,
21662166
msg_seq_no = MsgSeqNo},
21672167
_RoutedToQueueNames = [QName]}, State0 = #ch{queue_states = QueueStates0}) -> %% optimisation when there is one queue
2168-
AllNames = case rabbit_amqqueue:lookup(QName) of
2169-
{ok, Q0} ->
2170-
case amqqueue:get_options(Q0) of
2171-
#{extra_bcc := BCC} -> [QName, rabbit_misc:r(QName#resource.virtual_host, queue, BCC)];
2172-
_ -> [QName]
2173-
end;
2174-
_ -> []
2175-
end,
2176-
Qs = rabbit_amqqueue:lookup(AllNames),
2168+
{QueueNames, Qs} = case rabbit_amqqueue:lookup(QName) of
2169+
{ok, Q} ->
2170+
{[QName], [Q]};
2171+
_ -> {[], []}
2172+
end,
21772173
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
21782174
{ok, QueueStates, Actions} ->
21792175
rabbit_global_counters:messages_routed(amqp091, erlang:min(1, length(Qs))),
21802176
%% NB: the order here is important since basic.returns must be
21812177
%% sent before confirms.
21822178
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2183-
State1 = process_routing_confirm(Confirm, AllNames, MsgSeqNo, XName, State0),
2179+
State1 = process_routing_confirm(Confirm, QueueNames, MsgSeqNo, XName, State0),
21842180
%% Actions must be processed after registering confirms as actions may
21852181
%% contain rejections of publishes
21862182
State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
@@ -2208,21 +2204,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22082204
confirm = Confirm,
22092205
msg_seq_no = MsgSeqNo},
22102206
RoutedToQueueNames}, State0 = #ch{queue_states = QueueStates0}) ->
2211-
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
2212-
AllQueueNames = lists:map(fun amqqueue:get_name/1, Qs0),
2213-
AllExtraBCCs = infer_extra_bcc(Qs0),
2214-
%% Collect implicit BCC targets these queues may have
2215-
Qs = case AllExtraBCCs of
2216-
[] -> Qs0;
2217-
ExtraNames -> Qs0 ++ rabbit_amqqueue:lookup(ExtraNames)
2218-
end,
2207+
Qs = rabbit_amqqueue:lookup(RoutedToQueueNames),
2208+
QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
22192209
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
22202210
{ok, QueueStates, Actions} ->
22212211
rabbit_global_counters:messages_routed(amqp091, length(Qs)),
22222212
%% NB: the order here is important since basic.returns must be
22232213
%% sent before confirms.
22242214
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
2225-
State1 = process_routing_confirm(Confirm, AllQueueNames,
2215+
State1 = process_routing_confirm(Confirm, QueueNames,
22262216
MsgSeqNo, XName, State0),
22272217
%% Actions must be processed after registering confirms as actions may
22282218
%% contain rejections of publishes
@@ -2231,7 +2221,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22312221
fine ->
22322222
?INCR_STATS(exchange_stats, XName, 1, publish),
22332223
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish)
2234-
|| QName <- AllQueueNames];
2224+
|| QName <- QueueNames];
22352225
_ ->
22362226
ok
22372227
end,
@@ -2243,28 +2233,6 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
22432233
[rabbit_misc:rs(Resource)])
22442234
end.
22452235

2246-
-spec infer_extra_bcc([amqqueue:amqqueue()]) -> [rabbit_amqqueue:name()].
2247-
infer_extra_bcc([]) ->
2248-
[];
2249-
infer_extra_bcc([Q]) ->
2250-
case amqqueue:get_options(Q) of
2251-
#{extra_bcc := BCC} ->
2252-
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
2253-
[rabbit_misc:r(VHost, queue, BCC)];
2254-
_ ->
2255-
[]
2256-
end;
2257-
infer_extra_bcc(Qs) ->
2258-
lists:foldl(fun(Q, Acc) ->
2259-
case amqqueue:get_options(Q) of
2260-
#{extra_bcc := BCC} ->
2261-
#resource{virtual_host = VHost} = amqqueue:get_name(Q),
2262-
[rabbit_misc:r(VHost, queue, BCC) | Acc];
2263-
_ ->
2264-
Acc
2265-
end
2266-
end, [], Qs).
2267-
22682236
process_routing_mandatory(_Mandatory = true,
22692237
_RoutedToQs = [],
22702238
Msg, State) ->

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,10 @@ recover_durable_queues(QueuesAndRecoveryTerms) ->
445445

446446
capabilities() ->
447447
#{unsupported_policies => [ %% Stream policies
448-
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
449-
<<"queue-leader-locator">>, <<"initial-cluster-size">>],
448+
<<"max-age">>, <<"stream-max-segment-size-bytes">>,
449+
<<"queue-leader-locator">>, <<"initial-cluster-size">>,
450+
%% Quorum policies
451+
<<"dead-letter-strategy">>],
450452
queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
451453
<<"x-dead-letter-routing-key">>, <<"x-max-length">>,
452454
<<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,

deps/rabbit/src/rabbit_dead_letter.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,17 @@
77

88
-module(rabbit_dead_letter).
99

10-
-export([publish/5]).
10+
-export([publish/5,
11+
make_msg/5,
12+
detect_cycles/3]).
1113

1214
-include_lib("rabbit_common/include/rabbit.hrl").
1315
-include_lib("rabbit_common/include/rabbit_framing.hrl").
1416

1517
%%----------------------------------------------------------------------------
1618

1719
-type reason() :: 'expired' | 'rejected' | 'maxlen' | delivery_limit.
20+
-export_type([reason/0]).
1821

1922
%%----------------------------------------------------------------------------
2023

@@ -39,7 +42,7 @@ make_msg(Msg = #basic_message{content = Content,
3942
undefined -> {RoutingKeys, fun (H) -> H end};
4043
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
4144
end,
42-
ReasonBin = list_to_binary(atom_to_list(Reason)),
45+
ReasonBin = atom_to_binary(Reason),
4346
TimeSec = os:system_time(seconds),
4447
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
4548
HeadersFun2 =

deps/rabbit/src/rabbit_disk_monitor.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,8 @@ get_disk_free(Dir, {win32, _}) ->
246246
rabbit_log:warning("Expected the mnesia directory absolute "
247247
"path to start with a drive letter like "
248248
"'C:'. The path is: '~p'", [Dir]),
249-
case win32_get_disk_free_dir(Dir) of
250-
{ok, Free} ->
251-
Free;
252-
_ -> exit(could_not_determine_disk_free)
253-
end;
249+
{ok, Free} = win32_get_disk_free_dir(Dir),
250+
Free;
254251
DriveLetter ->
255252
case catch win32_get_disk_free_pwsh(DriveLetter) of
256253
{ok, Free1} -> Free1;

0 commit comments

Comments
 (0)