Skip to content

Monitor stream local member in stream queue #4217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
[rabbit_misc:rs(QueueName)]);
{error, no_local_stream_replica_available} ->
rabbit_misc:protocol_error(
resource_error, "~s does not not have a running local replica",
resource_error, "~s does not have a running local replica",
[rabbit_misc:rs(QueueName)])
end;
{ok, _} ->
Expand Down
234 changes: 207 additions & 27 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
%%

-module(rabbit_stream_coordinator).
Expand All @@ -23,7 +23,8 @@
-export([recover/0,
add_replica/2,
delete_replica/2,
register_listener/1]).
register_listener/1,
register_local_member_listener/1]).

-export([new_stream/2,
delete_stream/2]).
Expand All @@ -39,6 +40,10 @@
-export([log_overview/1]).
-export([replay/1]).

%% for testing and debugging
-export([eval_listeners/3,
state/0]).

-rabbit_boot_step({?MODULE,
[{description, "Restart stream coordinator"},
{mfa, {?MODULE, recover, []}},
Expand Down Expand Up @@ -68,8 +73,9 @@
{delete_replica, stream_id(), #{node := node()}} |
{policy_changed, stream_id(), #{queue := amqqueue:amqqueue()}} |
{register_listener, #{pid := pid(),
node := node(),
stream_id := stream_id(),
queue_ref := queue_ref()}} |
type := leader | local_member}} |
{action_failed, stream_id(), #{index := ra:index(),
node := node(),
epoch := osiris:epoch(),
Expand Down Expand Up @@ -165,6 +171,15 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
process_command({policy_changed, StreamId, #{queue => Q}}).

%% for debugging
state() ->
case ra:local_query({?MODULE, node()}, fun(State) -> State end) of
{ok, {_, Res}, _} ->
Res;
Any ->
Any
end.

local_pid(StreamId) when is_list(StreamId) ->
MFA = {?MODULE, query_local_pid, [StreamId, node()]},
case ra:local_query({?MODULE, node()}, MFA) of
Expand Down Expand Up @@ -248,6 +263,16 @@ register_listener(Q) when ?is_amqqueue(Q)->
#{pid => self(),
stream_id => StreamId}}).

-spec register_local_member_listener(amqqueue:amqqueue()) ->
{error, term()} | {ok, ok | stream_not_found, atom() | {atom(), atom()}}.
register_local_member_listener(Q) when ?is_amqqueue(Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
process_command({register_listener,
#{pid => self(),
node => node(self()),
stream_id => StreamId,
type => local_member}}).

process_command(Cmd) ->
Servers = ensure_coordinator_started(),
process_command(Servers, Cmd).
Expand Down Expand Up @@ -313,7 +338,7 @@ all_coord_members() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
[{?MODULE, Node} || Node <- [node() | Nodes]].

version() -> 1.
version() -> 2.

which_module(_) ->
?MODULE.
Expand All @@ -323,7 +348,8 @@ init(_Conf) ->

-spec apply(map(), command(), state()) ->
{state(), term(), ra_machine:effects()}.
apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
{_CmdTag, StreamId, #{}} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0} = State0) ->
Stream0 = maps:get(StreamId, Streams0, undefined),
Expand All @@ -341,10 +367,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
case Stream1 of
undefined ->
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
Reply, inform_listeners_eol(Stream0));
Reply, inform_listeners_eol(MachineVersion, Stream0));
_ ->
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
{Stream3, Effects1} = eval_listeners(MachineVersion, Stream2, Effects0),
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
return(Meta,
Expand All @@ -354,10 +380,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
Reply ->
return(Meta, State0, Reply, [])
end;
apply(Meta, {down, Pid, Reason} = Cmd,
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
listeners = Listeners0,
monitors = Monitors0} = State) ->
monitors = Monitors0,
listeners = StateListeners0} = State) ->

Effects0 = case Reason of
noconnection ->
Expand All @@ -366,10 +392,10 @@ apply(Meta, {down, Pid, Reason} = Cmd,
[]
end,
case maps:take(Pid, Monitors0) of
{{StreamId, listener}, Monitors} ->
Listeners = case maps:take(StreamId, Listeners0) of
{{StreamId, listener}, Monitors} when MachineVersion < 2 ->
Listeners = case maps:take(StreamId, StateListeners0) of
error ->
Listeners0;
StateListeners0;
{Pids0, Listeners1} ->
case maps:remove(Pid, Pids0) of
Pids when map_size(Pids) == 0 ->
Expand All @@ -380,6 +406,23 @@ apply(Meta, {down, Pid, Reason} = Cmd,
end,
return(Meta, State#?MODULE{listeners = Listeners,
monitors = Monitors}, ok, Effects0);
{{PidStreams, listener}, Monitors} when MachineVersion >= 2 ->
Streams = maps:fold(
fun(StreamId, _, Acc) ->
case Acc of
#{StreamId := Stream = #stream{listeners = Listeners0}} ->
Listeners = maps:fold(fun({P, _} = K, _, A) when P == Pid ->
maps:remove(K, A);
(K, V, A) ->
A#{K => V}
end, #{}, Listeners0),
Acc#{StreamId => Stream#stream{listeners = Listeners}};
_ ->
Acc
end
end, Streams0, PidStreams),
return(Meta, State#?MODULE{streams = Streams,
monitors = Monitors}, ok, Effects0);
{{StreamId, member}, Monitors1} ->
case Streams0 of
#{StreamId := Stream0} ->
Expand All @@ -398,19 +441,53 @@ apply(Meta, {down, Pid, Reason} = Cmd,
error ->
return(Meta, State, ok, Effects0)
end;
apply(Meta, {register_listener, #{pid := Pid,
stream_id := StreamId}},
apply(#{machine_version := MachineVersion} = Meta,
{register_listener, #{pid := Pid,
stream_id := StreamId} = Args},
#?MODULE{streams = Streams,
monitors = Monitors0} = State0) ->
case Streams of
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
monitors = Monitors0} = State0) when MachineVersion =< 1 ->
Type = maps:get(type, Args, leader),
case {Streams, Type} of
{#{StreamId := #stream{listeners = Listeners0} = Stream0}, leader} ->
Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)},
{Stream, Effects} = eval_listeners(Stream1, []),
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
return(Meta,
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
monitors = Monitors}, ok,
[{monitor, process, Pid} | Effects]);
{#{StreamId := _Stream}, local_member} ->
%% registering a local member listener does not change the state in v1
return(Meta, State0, ok, []);
_ ->
return(Meta, State0, stream_not_found, [])
end;

apply(#{machine_version := MachineVersion} = Meta,
{register_listener, #{pid := Pid,
stream_id := StreamId} = Args},
#?MODULE{streams = Streams,
monitors = Monitors0} = State0) when MachineVersion >= 2 ->
Node = maps:get(node, Args, node(Pid)),
Type = maps:get(type, Args, leader),

case Streams of
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
{LKey, LValue} =
case Type of
leader ->
{{Pid, leader}, undefined};
local_member ->
{{Pid, member}, {Node, undefined}}
end,
Stream1 = Stream0#stream{listeners = maps:put(LKey, LValue, Listeners0)},
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
{PidStreams, listener} = maps:get(Pid, Monitors0, {#{}, listener}),
Monitors = maps:put(Pid, {PidStreams#{StreamId => ok}, listener}, Monitors0),
return(Meta,
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
monitors = Monitors}, ok,
[{monitor, process, Pid} | Effects]);
_ ->
return(Meta, State0, stream_not_found, [])
end;
Expand Down Expand Up @@ -438,7 +515,43 @@ apply(Meta, {nodeup, Node} = Cmd,
end, {Streams0, Effects0}, Streams0),
return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams}, ok, Effects);
apply(Meta, {machine_version, _From, _To}, State) ->
apply(Meta, {machine_version, From = 1, To = 2}, State = #?MODULE{streams = Streams0,
monitors = Monitors0}) ->
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, updating state.",
[From, To]),
%% conversion from old state to new state
%% additional operation: the stream listeners are never collected in the previous version
%% so we'll emit monitors for all listener PIDs
%% this way we'll get the DOWN event for dead listener PIDs and
%% we'll clean the stream listeners the in the DOWN event callback

%% transform the listeners of each stream and accumulate listener PIDs
{Streams1, Listeners} =
maps:fold(fun(S, #stream{listeners = L0} = S0, {StreamAcc, GlobalListAcc}) ->
{L1, GlobalListAcc1} = maps:fold(
fun(ListPid, LeaderPid, {LAcc, GLAcc}) ->
{LAcc#{{ListPid, leader} => LeaderPid},
GLAcc#{ListPid => S}}
end, {#{}, GlobalListAcc}, L0),
{StreamAcc#{S => S0#stream{listeners = L1}}, GlobalListAcc1}
end, {#{}, #{}}, Streams0),
%% accumulate monitors for the map and create the effects to emit the monitors
{ExtraMonitors, Effects} = maps:fold(fun(P, StreamId, {MAcc, EAcc}) ->
{MAcc#{P => {StreamId, listener}},
[{monitor, process, P} | EAcc]}
end, {#{}, []}, Listeners),
Monitors1 = maps:merge(Monitors0, ExtraMonitors),
Monitors2 = maps:fold(fun(P, {StreamId, listener}, Acc) ->
Acc#{P => {#{StreamId => ok}, listener}};
(P, V, Acc) ->
Acc#{P => V}
end, #{}, Monitors1),
return(Meta, State#?MODULE{streams = Streams1,
monitors = Monitors2,
listeners = undefined}, ok, Effects);
apply(Meta, {machine_version, From, To}, State) ->
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.",
[From, To]),
return(Meta, State, ok, []);
apply(Meta, UnkCmd, State) ->
rabbit_log:debug("~s: unknown command ~W",
Expand Down Expand Up @@ -1240,21 +1353,35 @@ update_stream0(#{system_time := _Ts},
update_stream0(_Meta, _Cmd, undefined) ->
undefined.

inform_listeners_eol(#stream{target = deleted,
inform_listeners_eol(MachineVersion, #stream{target = deleted,
listeners = Listeners,
queue_ref = QRef
}) ->
}) when MachineVersion =< 1 ->
lists:map(fun(Pid) ->
{send_msg, Pid,
{queue_event, QRef, eol},
cast}
end, maps:keys(Listeners));
inform_listeners_eol(_) ->
inform_listeners_eol(MachineVersion,
#stream{target = deleted,
listeners = Listeners,
queue_ref = QRef}) when MachineVersion >= 2 ->
LPidsMap = maps:fold(fun({P, _}, _V, Acc) ->
Acc#{P => ok}
end, #{}, Listeners),
lists:map(fun(Pid) ->
{send_msg,
Pid,
{queue_event, QRef, eol},
cast}
end, maps:keys(LPidsMap));
inform_listeners_eol(_, _) ->
[].

eval_listeners(#stream{listeners = Listeners0,
queue_ref = QRef,
members = Members} = Stream, Effects0) ->
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
queue_ref = QRef,
members = Members} = Stream, Effects0)
when MachineVersion =< 1 ->
case find_leader(Members) of
{#member{state = {running, _, LeaderPid}}, _} ->
%% a leader is running, check all listeners to see if any of them
Expand All @@ -1273,7 +1400,60 @@ eval_listeners(#stream{listeners = Listeners0,
{Stream#stream{listeners = Listeners}, Effects};
_ ->
{Stream, Effects0}
end.
end;
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
queue_ref = QRef,
members = Members} = Stream0, Effects0) when MachineVersion >= 2 ->
%% Iterating over stream listeners.
%% Returning the new map of listeners and the effects (notification of changes)
{Listeners1, Effects1} =
maps:fold(fun({P, leader}, ListLPid0, {Lsts0, Effs0}) ->
%% iterating over member to find the leader
{ListLPid1, Effs1} =
maps:fold(fun(_N, #member{state = {running, _, LeaderPid},
role = {writer, _},
target = T}, A)
when ListLPid0 == LeaderPid, T /= deleted ->
%% it's the leader, same PID, nothing to do
A;
(_N, #member{state = {running, _, LeaderPid},
role = {writer, _},
target = T}, {_, Efs})
when T /= deleted ->
%% it's the leader, not same PID, assign the new leader, add effect
{LeaderPid, [{send_msg, P,
{queue_event, QRef,
{stream_leader_change, LeaderPid}},
cast} | Efs]};
(_N, _M, Acc) ->
%% it's not the leader, nothing to do
Acc
end, {ListLPid0, Effs0}, Members),
{Lsts0#{{P, leader} => ListLPid1}, Effs1};
({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0}) ->
%% listening to a member on a given node
%% iterating over the members to find the member on this node
{ListMPid1, Effs1} =
maps:fold(fun(MNode, #member{state = {running, _, MemberPid}, target = T}, Acc)
when ListMPid0 == MemberPid, ListNode == MNode, T /= deleted ->
%% it's the local member of this listener
%% it has not changed, nothing to do
Acc;
(MNode, #member{state = {running, _, MemberPid}, target = T}, {_, Efs})
when ListNode == MNode, T /= deleted ->
%% it's the local member of this listener
%% the PID is not the same, updating it in the listener, add effect
{MemberPid, [{send_msg, P,
{queue_event, QRef,
{stream_local_member_change, MemberPid}},
cast} | Efs]};
(_N, _M, Acc) ->
%% not a replica, nothing to do
Acc
end, {ListMPid0, Effs0}, Members),
{Lsts0#{{P, member} => {ListNode, ListMPid1}}, Effs1}
end, {Listeners0, Effects0}, Listeners0),
{Stream0#stream{listeners = Listeners1}, Effects1}.

eval_retention(#{index := Idx} = Meta,
#stream{conf = #{retention := Ret} = Conf,
Expand Down
Loading