Skip to content

Bugfixes #1803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
consumer_mapping = ConsumerMapping} = State0) ->
case QueueStates of
#{Name := QState0} ->
QName = rabbit_quorum_queue:queue_name(QState0),
case rabbit_quorum_queue:handle_event(Evt, QState0) of
{{delivery, CTag, Msgs}, QState1} ->
AckRequired = case maps:find(CTag, ConsumerMapping) of
Expand All @@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
true ->
QState1
end,
QName = rabbit_quorum_queue:queue_name(QState2),
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
Expand Down Expand Up @@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
queue_names = maps:remove(Name, QNames)})
Expand Down
108 changes: 68 additions & 40 deletions src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
make_update_state/1
make_update_config/1
]).

-type raw_msg() :: term().
Expand Down Expand Up @@ -131,7 +131,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
-record(update_state, {config :: config()}).
-record(update_config, {config :: config()}).



Expand All @@ -143,7 +143,7 @@
#discard{} |
#credit{} |
#purge{} |
#update_state{}.
#update_config{}.

-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
Expand Down Expand Up @@ -260,10 +260,10 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_state(Conf, #state{name = Name,
update_config(Conf, #state{name = Name,
queue_resource = Resource}).

update_state(Conf, State) ->
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
Expand Down Expand Up @@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection},
Effects0, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspect
% and monitor the node
{Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
{Co, St0}) when node(P) =:= Node ->
St = return_all(St0, Checked0),
{maps:put(K, C#consumer{suspected_down = true,
checked_out = #{}},
Co),
St};
(K, C, {Co, St}) ->
{maps:put(K, C, Co), St}
end, {#{}, State0}, Cons0),
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
{Cons, State} = maps:fold(
fun({_, P} = K,
#consumer{checked_out = Checked0} = C,
{Co, St0}) when node(P) =:= Node ->
St = return_all(St0, Checked0),
%% TODO: need to increment credit here
%% with the size of the Checked map
Credit = increase_credit(C, maps:size(Checked0)),
{maps:put(K, C#consumer{suspected_down = true,
credit = Credit,
checked_out = #{}}, Co),
St};
(K, C, {Co, St}) ->
{maps:put(K, C, Co), St}
end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
Expand Down Expand Up @@ -515,8 +521,8 @@ apply(_, {nodeup, Node}, Effects0,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok};
apply(_, #update_state{config = Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
apply(_, #update_config{config = Conf}, Effects, State) ->
{update_config(Conf, State), Effects, ok}.

-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
Expand Down Expand Up @@ -587,7 +593,9 @@ overview(#state{consumers = Cons,
get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
[{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)];
[{K, snd(snd(maps:get(K, Checked)))}
|| K <- lists:seq(From, To),
maps:is_key(K, Checked)];
_ ->
[]
end.
Expand Down Expand Up @@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).

return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
return(ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = case Life of
auto ->
Num = length(MsgNumMsgs),
Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, Num)};
once ->
Con0#consumer{checked_out = Checked}
end,
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
Expand Down Expand Up @@ -900,12 +902,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}).

return_all(State, Checked) ->
maps:fold(fun (_, '$prefix_msg', S) ->
return_one(0, '$prefix_msg', S);
(_, {MsgNum, Msg}, S) ->
return_one(MsgNum, Msg, S)
end, State, Checked).
return_all(State, Checked0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
lists:foldl(fun ({_, '$prefix_msg'}, S) ->
return_one(0, '$prefix_msg', S);
({_, {MsgNum, Msg}}, S) ->
return_one(MsgNum, Msg, S)
end, State, Checked).

checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
Expand Down Expand Up @@ -1170,9 +1175,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.

-spec make_update_state(config()) -> protocol().
make_update_state(Config) ->
#update_state{config = Config}.
-spec make_update_config(config()) -> protocol().
make_update_config(Config) ->
#update_config{config = Config}.

add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
Expand Down Expand Up @@ -1502,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
Node = node(Pid),
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
{State1, Effects1} = check(Cid, 2, State0),
{State1, Effects1} = check_auto(Cid, 2, State0),
#consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
#consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
%% validate consumer has credit
{State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
Expand Down Expand Up @@ -1865,6 +1873,26 @@ purge_with_checkout_test() ->
?assertEqual(0, maps:size(Checked)),
ok.

down_returns_checked_out_in_order_test() ->
S0 = test_init(?FUNCTION_NAME),
%% enqueue 100
S1 = lists:foldl(fun (Num, FS0) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
?assertEqual(100, maps:size(S1#state.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
#consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
?assertEqual(100, maps:size(Checked)),
%% simulate down
{S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2),
Returns = lqueue:to_list(S#state.returns),
?assertEqual(100, length(Returns)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.

meta(Idx) ->
#{index => Idx, term => 1}.

Expand Down Expand Up @@ -1900,7 +1928,7 @@ check_auto(Cid, Idx, State) ->
check(Cid, Idx, Num, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
[], State)).

settle(Cid, Idx, MsgId, State) ->
Expand Down
33 changes: 23 additions & 10 deletions src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].

-type cluster_name() :: rabbit_types:r(queue).

-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).

-record(state, {cluster_name :: ra_cluster_name(),
-record(state, {cluster_name :: cluster_name(),
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
Expand Down Expand Up @@ -88,7 +90,7 @@
%% @param ClusterName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
-spec init(ra_cluster_name(), [ra_server_id()]) -> state().
-spec init(cluster_name(), [ra_server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).

Expand All @@ -98,15 +100,15 @@ init(ClusterName, Servers) ->
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
servers = Servers,
soft_limit = SoftLimit,
timeout = Timeout}.

-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
Expand Down Expand Up @@ -397,12 +399,12 @@ purge(Node) ->
end.

%% @doc returns the cluster name
-spec cluster_name(state()) -> ra_cluster_name().
-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.

update_machine_state(Node, Conf) ->
case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
Expand Down Expand Up @@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
CDels0)}};
#consumer{last_msg_id = Prev} = C
when FstId > Prev+1 ->
NumMissing = FstId - Prev + 1,
%% there may actually be fewer missing messages returned than expected
%% This can happen when a node the channel is on gets disconnected
%% from the node the leader is on and then reconnected afterwards.
%% When the node is disconnected the leader will return all checked
%% out messages to the main queue to ensure they don't get stuck in
%% case the node never comes back.
Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
Del = {delivery, Tag, Missing ++ IdMsgs},
{Del, State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
length(IdMsgs) + length(Missing),
length(IdMsgs) + NumMissing,
C, CDels0)}};
#consumer{last_msg_id = Prev}
when FstId =< Prev ->
Expand Down Expand Up @@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command,
ok = ra:pipeline_command(Node, Command, Seq),
State#state{pending = Pending#{Seq => {Correlation, Command}}}.

add_command(_Cid, _Tag, [], Acc) ->
add_command(_, _, [], Acc) ->
Acc;
add_command(Cid, Tag, MsgIds, Acc) ->
[{Tag, MsgIds, Cid} | Acc].
add_command(Cid, settle, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
add_command(Cid, return, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
add_command(Cid, discard, MsgIds, Acc) ->
[rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
2 changes: 1 addition & 1 deletion src/rabbit_quorum_memory_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
%%
-module(rabbit_quorum_memory_manager).

-include("rabbit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").

-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
Expand Down
Loading