Skip to content

Commit e8fb108

Browse files
authored
Merge pull request #1803 from rabbitmq/vanlightly-bugs
Bugfixes
2 parents 6b73cad + 78eedc2 commit e8fb108

File tree

5 files changed

+134
-83
lines changed

5 files changed

+134
-83
lines changed

src/rabbit_channel.erl

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
654654
consumer_mapping = ConsumerMapping} = State0) ->
655655
case QueueStates of
656656
#{Name := QState0} ->
657+
QName = rabbit_quorum_queue:queue_name(QState0),
657658
case rabbit_quorum_queue:handle_event(Evt, QState0) of
658659
{{delivery, CTag, Msgs}, QState1} ->
659660
AckRequired = case maps:find(CTag, ConsumerMapping) of
@@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
670671
true ->
671672
QState1
672673
end,
673-
QName = rabbit_quorum_queue:queue_name(QState2),
674674
State = lists:foldl(
675675
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
676676
IsDelivered = maps:is_key(delivery_count, MsgHeader),
@@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
702702
%% TODO: this should use dtree:take/3
703703
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
704704
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
705-
case maps:find(Name, QNames) of
706-
{ok, QName} -> erase_queue_stats(QName);
707-
error -> ok
708-
end,
705+
erase_queue_stats(QName),
709706
noreply_coalesce(
710707
State3#ch{queue_states = maps:remove(Name, QueueStates),
711708
queue_names = maps:remove(Name, QNames)})

src/rabbit_fifo.erl

Lines changed: 68 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
make_discard/2,
5757
make_credit/4,
5858
make_purge/0,
59-
make_update_state/1
59+
make_update_config/1
6060
]).
6161

6262
-type raw_msg() :: term().
@@ -131,7 +131,7 @@
131131
delivery_count :: non_neg_integer(),
132132
drain :: boolean()}).
133133
-record(purge, {}).
134-
-record(update_state, {config :: config()}).
134+
-record(update_config, {config :: config()}).
135135

136136

137137

@@ -143,7 +143,7 @@
143143
#discard{} |
144144
#credit{} |
145145
#purge{} |
146-
#update_state{}.
146+
#update_config{}.
147147

148148
-type command() :: protocol() | ra_machine:builtin_command().
149149
%% all the command types suppored by ra fifo
@@ -260,10 +260,10 @@
260260
-spec init(config()) -> state().
261261
init(#{name := Name,
262262
queue_resource := Resource} = Conf) ->
263-
update_state(Conf, #state{name = Name,
263+
update_config(Conf, #state{name = Name,
264264
queue_resource = Resource}).
265265

266-
update_state(Conf, State) ->
266+
update_config(Conf, State) ->
267267
DLH = maps:get(dead_letter_handler, Conf, undefined),
268268
BLH = maps:get(become_leader_handler, Conf, undefined),
269269
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
@@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection},
435435
Effects0, #state{consumers = Cons0,
436436
enqueuers = Enqs0} = State0) ->
437437
Node = node(ConsumerPid),
438-
% mark all consumers and enqueuers as suspect
439-
% and monitor the node
440-
{Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
441-
{Co, St0}) when node(P) =:= Node ->
442-
St = return_all(St0, Checked0),
443-
{maps:put(K, C#consumer{suspected_down = true,
444-
checked_out = #{}},
445-
Co),
446-
St};
447-
(K, C, {Co, St}) ->
448-
{maps:put(K, C, Co), St}
449-
end, {#{}, State0}, Cons0),
438+
% mark all consumers and enqueuers as suspected down
439+
% and monitor the node so that we can find out the final state of the
440+
% process at some later point
441+
{Cons, State} = maps:fold(
442+
fun({_, P} = K,
443+
#consumer{checked_out = Checked0} = C,
444+
{Co, St0}) when node(P) =:= Node ->
445+
St = return_all(St0, Checked0),
446+
%% TODO: need to increment credit here
447+
%% with the size of the Checked map
448+
Credit = increase_credit(C, maps:size(Checked0)),
449+
{maps:put(K, C#consumer{suspected_down = true,
450+
credit = Credit,
451+
checked_out = #{}}, Co),
452+
St};
453+
(K, C, {Co, St}) ->
454+
{maps:put(K, C, Co), St}
455+
end, {#{}, State0}, Cons0),
450456
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
451457
E#enqueuer{suspected_down = true};
452458
(_, E) -> E
@@ -515,8 +521,8 @@ apply(_, {nodeup, Node}, Effects0,
515521
service_queue = SQ}, Monitors ++ Effects);
516522
apply(_, {nodedown, _Node}, Effects, State) ->
517523
{State, Effects, ok};
518-
apply(_, #update_state{config = Conf}, Effects, State) ->
519-
{update_state(Conf, State), Effects, ok}.
524+
apply(_, #update_config{config = Conf}, Effects, State) ->
525+
{update_config(Conf, State), Effects, ok}.
520526

521527
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
522528
state_enter(leader, #state{consumers = Cons,
@@ -587,7 +593,9 @@ overview(#state{consumers = Cons,
587593
get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
588594
case Consumers of
589595
#{Cid := #consumer{checked_out = Checked}} ->
590-
[{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)];
596+
[{K, snd(snd(maps:get(K, Checked)))}
597+
|| K <- lists:seq(From, To),
598+
maps:is_key(K, Checked)];
591599
_ ->
592600
[]
593601
end.
@@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
769777
snd(T) ->
770778
element(2, T).
771779

772-
return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
780+
return(ConsumerId, MsgNumMsgs, Con0, Checked,
773781
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
774-
Con = case Life of
775-
auto ->
776-
Num = length(MsgNumMsgs),
777-
Con0#consumer{checked_out = Checked,
778-
credit = increase_credit(Con0, Num)};
779-
once ->
780-
Con0#consumer{checked_out = Checked}
781-
end,
782+
Con = Con0#consumer{checked_out = Checked,
783+
credit = increase_credit(Con0, length(MsgNumMsgs))},
782784
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
783785
SQ0, Effects0),
784786
State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
@@ -900,12 +902,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
900902
State0#state{messages = maps:put(MsgNum, Msg, Messages),
901903
returns = lqueue:in(MsgNum, Returns)}).
902904

903-
return_all(State, Checked) ->
904-
maps:fold(fun (_, '$prefix_msg', S) ->
905-
return_one(0, '$prefix_msg', S);
906-
(_, {MsgNum, Msg}, S) ->
907-
return_one(MsgNum, Msg, S)
908-
end, State, Checked).
905+
return_all(State, Checked0) ->
906+
%% need to sort the list so that we return messages in the order
907+
%% they were checked out
908+
Checked = lists:sort(maps:to_list(Checked0)),
909+
lists:foldl(fun ({_, '$prefix_msg'}, S) ->
910+
return_one(0, '$prefix_msg', S);
911+
({_, {MsgNum, Msg}}, S) ->
912+
return_one(MsgNum, Msg, S)
913+
end, State, Checked).
909914

910915
checkout(State, Effects) ->
911916
checkout0(checkout_one(State), Effects, #{}).
@@ -1170,9 +1175,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
11701175
-spec make_purge() -> protocol().
11711176
make_purge() -> #purge{}.
11721177

1173-
-spec make_update_state(config()) -> protocol().
1174-
make_update_state(Config) ->
1175-
#update_state{config = Config}.
1178+
-spec make_update_config(config()) -> protocol().
1179+
make_update_config(Config) ->
1180+
#update_config{config = Config}.
11761181

11771182
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
11781183
Bytes = message_size(Msg),
@@ -1502,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
15021507
Node = node(Pid),
15031508
{State0, Effects0} = enq(1, 1, second, test_init(test)),
15041509
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
1505-
{State1, Effects1} = check(Cid, 2, State0),
1510+
{State1, Effects1} = check_auto(Cid, 2, State0),
1511+
#consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
15061512
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
15071513
% monitor both enqueuer and consumer
15081514
% because we received a noconnection we now need to monitor the node
15091515
{State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
1516+
#consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
1517+
%% validate consumer has credit
15101518
{State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
15111519
?ASSERT_EFF({monitor, node, _}, Effects2),
15121520
?assertNoEffect({demonitor, process, _}, Effects2),
@@ -1865,6 +1873,26 @@ purge_with_checkout_test() ->
18651873
?assertEqual(0, maps:size(Checked)),
18661874
ok.
18671875

1876+
down_returns_checked_out_in_order_test() ->
1877+
S0 = test_init(?FUNCTION_NAME),
1878+
%% enqueue 100
1879+
S1 = lists:foldl(fun (Num, FS0) ->
1880+
{FS, _} = enq(Num, Num, Num, FS0),
1881+
FS
1882+
end, S0, lists:seq(1, 100)),
1883+
?assertEqual(100, maps:size(S1#state.messages)),
1884+
Cid = {<<"cid">>, self()},
1885+
{S2, _} = check(Cid, 101, 1000, S1),
1886+
#consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
1887+
?assertEqual(100, maps:size(Checked)),
1888+
%% simulate down
1889+
{S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2),
1890+
Returns = lqueue:to_list(S#state.returns),
1891+
?assertEqual(100, length(Returns)),
1892+
%% validate returns are in order
1893+
?assertEqual(lists:sort(Returns), Returns),
1894+
ok.
1895+
18681896
meta(Idx) ->
18691897
#{index => Idx, term => 1}.
18701898

@@ -1900,7 +1928,7 @@ check_auto(Cid, Idx, State) ->
19001928
check(Cid, Idx, Num, State) ->
19011929
strip_reply(
19021930
apply(meta(Idx),
1903-
make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
1931+
make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
19041932
[], State)).
19051933

19061934
settle(Cid, Idx, MsgId, State) ->

src/rabbit_fifo_client.erl

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@
5252
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
5353
-type actions() :: [action()].
5454

55+
-type cluster_name() :: rabbit_types:r(queue).
56+
5557
-record(consumer, {last_msg_id :: seq() | -1,
5658
delivery_count = 0 :: non_neg_integer()}).
5759

58-
-record(state, {cluster_name :: ra_cluster_name(),
60+
-record(state, {cluster_name :: cluster_name(),
5961
servers = [] :: [ra_server_id()],
6062
leader :: maybe(ra_server_id()),
6163
next_seq = 0 :: seq(),
@@ -88,7 +90,7 @@
8890
%% @param ClusterName the id of the cluster to interact with
8991
%% @param Servers The known servers of the queue. If the current leader is known
9092
%% ensure the leader node is at the head of the list.
91-
-spec init(ra_cluster_name(), [ra_server_id()]) -> state().
93+
-spec init(cluster_name(), [ra_server_id()]) -> state().
9294
init(ClusterName, Servers) ->
9395
init(ClusterName, Servers, ?SOFT_LIMIT).
9496

@@ -98,15 +100,15 @@ init(ClusterName, Servers) ->
98100
%% @param Servers The known servers of the queue. If the current leader is known
99101
%% ensure the leader node is at the head of the list.
100102
%% @param MaxPending size defining the max number of pending commands.
101-
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
103+
-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
102104
init(ClusterName = #resource{}, Servers, SoftLimit) ->
103105
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
104106
#state{cluster_name = ClusterName,
105107
servers = Servers,
106108
soft_limit = SoftLimit,
107109
timeout = Timeout}.
108110

109-
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
111+
-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
110112
fun(() -> ok)) -> state().
111113
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
112114
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -397,12 +399,12 @@ purge(Node) ->
397399
end.
398400

399401
%% @doc returns the cluster name
400-
-spec cluster_name(state()) -> ra_cluster_name().
402+
-spec cluster_name(state()) -> cluster_name().
401403
cluster_name(#state{cluster_name = ClusterName}) ->
402404
ClusterName.
403405

404406
update_machine_state(Node, Conf) ->
405-
case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
407+
case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
406408
{ok, ok, _} ->
407409
ok;
408410
Err ->
@@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
620622
CDels0)}};
621623
#consumer{last_msg_id = Prev} = C
622624
when FstId > Prev+1 ->
625+
NumMissing = FstId - Prev + 1,
626+
%% there may actually be fewer missing messages returned than expected
627+
%% This can happen when a node the channel is on gets disconnected
628+
%% from the node the leader is on and then reconnected afterwards.
629+
%% When the node is disconnected the leader will return all checked
630+
%% out messages to the main queue to ensure they don't get stuck in
631+
%% case the node never comes back.
623632
Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
624633
Del = {delivery, Tag, Missing ++ IdMsgs},
625634
{Del, State0#state{consumer_deliveries =
626635
update_consumer(Tag, LastId,
627-
length(IdMsgs) + length(Missing),
636+
length(IdMsgs) + NumMissing,
628637
C, CDels0)}};
629638
#consumer{last_msg_id = Prev}
630639
when FstId =< Prev ->
@@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command,
714723
ok = ra:pipeline_command(Node, Command, Seq),
715724
State#state{pending = Pending#{Seq => {Correlation, Command}}}.
716725

717-
add_command(_Cid, _Tag, [], Acc) ->
726+
add_command(_, _, [], Acc) ->
718727
Acc;
719-
add_command(Cid, Tag, MsgIds, Acc) ->
720-
[{Tag, MsgIds, Cid} | Acc].
728+
add_command(Cid, settle, MsgIds, Acc) ->
729+
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
730+
add_command(Cid, return, MsgIds, Acc) ->
731+
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
732+
add_command(Cid, discard, MsgIds, Acc) ->
733+
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc].

src/rabbit_quorum_memory_manager.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
%%
1616
-module(rabbit_quorum_memory_manager).
1717

18-
-include("rabbit.hrl").
18+
-include_lib("rabbit_common/include/rabbit.hrl").
1919

2020
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
2121
terminate/2, code_change/3]).

0 commit comments

Comments
 (0)