Skip to content

Improve stream reader metrics cleanup (backport #3340) #3354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 59 additions & 51 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ process_command([], _Cmd) ->
process_command([Server | Servers], Cmd) ->
case ra:process_command(Server, Cmd, ?CMD_TIMEOUT) of
{timeout, _} ->
rabbit_log:warning("Coordinator timeout on server ~p when processing command ~p",
[Server, element(1, Cmd)]),
rabbit_log:warning("Coordinator timeout on server ~s when processing command ~W",
[Server, element(1, Cmd), 10]),
process_command(Servers, Cmd);
{error, noproc} ->
process_command(Servers, Cmd);
Expand Down Expand Up @@ -511,8 +511,8 @@ add_members(Members, [Node | Nodes]) ->
add_members(Members, Nodes)
end;
Error ->
rabbit_log:warning("Stream coordinator failed to start on node ~p : ~p",
[Node, Error]),
rabbit_log:warning("Stream coordinator failed to start on node ~s : ~W",
[Node, Error, 10]),
add_members(Members, Nodes)
end.

Expand All @@ -529,12 +529,11 @@ remove_members(Members, [Node | Nodes]) ->
-record(aux, {actions = #{} ::
#{pid() := {stream_id(), #{node := node(),
index := non_neg_integer(),
epoch := osiris:epoch()}}},
epoch := osiris:epoch()}}},
resizer :: undefined | pid()}).

init_aux(_Name) ->
#aux{}.
% {#{}, undefined}.

%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
handle_aux(leader, _, maybe_resize_coordinator_cluster,
Expand Down Expand Up @@ -647,8 +646,8 @@ phase_start_replica(StreamId, #{epoch := Epoch,
fun() ->
try osiris_replica:start(Node, Conf0) of
{ok, Pid} ->
rabbit_log:debug("~s: ~s: replica started on ~s in ~b pid ~w",
[?MODULE, StreamId, Node, Epoch, Pid]),
rabbit_log:info("~s: ~s: replica started on ~s in ~b pid ~w",
[?MODULE, StreamId, Node, Epoch, Pid]),
send_self_command({member_started, StreamId,
Args#{pid => Pid}});
{error, already_present} ->
Expand All @@ -663,14 +662,14 @@ phase_start_replica(StreamId, #{epoch := Epoch,
send_self_command({member_started, StreamId,
Args#{pid => Pid}});
{error, Reason} ->
rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
[?MODULE, maps:get(name, Conf0), Node, Epoch, Reason, 10]),
maybe_sleep(Reason),
rabbit_log:warning("~s: Error while starting replica for ~s : ~W",
[?MODULE, maps:get(name, Conf0), Reason, 10]),
send_action_failed(StreamId, starting, Args)
catch _:E ->
rabbit_log:warning("~s: Error while starting replica for ~s : ~p",
[?MODULE, maps:get(name, Conf0), E]),
maybe_sleep(E),
catch _:Error ->
rabbit_log:warning("~s: Error while starting replica for ~s on node ~s in ~b : ~W",
[?MODULE, maps:get(name, Conf0), Node, Epoch, Error, 10]),
maybe_sleep(Error),
send_action_failed(StreamId, starting, Args)
end
end.
Expand All @@ -691,8 +690,8 @@ phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
_ ->
send_action_failed(StreamId, deleting, Arg)
catch _:E ->
rabbit_log:warning("~s: Error while deleting member for ~s : on node ~s ~p",
[?MODULE, StreamId, Node, E]),
rabbit_log:warning("~s: Error while deleting member for ~s : on node ~s ~W",
[?MODULE, StreamId, Node, E, 10]),
maybe_sleep(E),
send_action_failed(StreamId, deleting, Arg)
end
Expand All @@ -711,27 +710,27 @@ phase_stop_member(StreamId, #{node := Node,
[?MODULE, StreamId, Node, Epoch, Tail]),
send_self_command({member_stopped, StreamId, Arg});
Err ->
rabbit_log:warning("Stream coordinator failed to get tail
of member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
catch _:Err ->
rabbit_log:warning("Stream coordinator failed to get
tail of member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to get tail of member ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
end;
Err ->
rabbit_log:warning("Stream coordinator failed to stop
member ~s ~w Error: ~w",
[StreamId, Node, Err]),
send_action_failed(StreamId, stopping, Arg0)
catch _:Err ->
rabbit_log:warning("Stream coordinator failed to stop
member ~s ~w Error: ~w",
[StreamId, Node, Err]),
rabbit_log:warning("~s: failed to stop "
"member ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
catch _:Err ->
rabbit_log:warning("~s: failed to stop member ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
maybe_sleep(Err),
send_action_failed(StreamId, stopping, Arg0)
end
end.

Expand All @@ -741,19 +740,18 @@ phase_start_writer(StreamId, #{epoch := Epoch,
try osiris_writer:start(Conf) of
{ok, Pid} ->
Args = Args0#{epoch => Epoch, pid => Pid},
rabbit_log:warning("~s: started writer ~s on ~w in ~b",
[?MODULE, StreamId, Node, Epoch]),
rabbit_log:info("~s: started writer ~s on ~w in ~b",
[?MODULE, StreamId, Node, Epoch]),
send_self_command({member_started, StreamId, Args});
Err ->
%% no sleep for writer failures
rabbit_log:warning("~s: failed to start
writer ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
%% no sleep for writer failures as we want to trigger a new
%% election asap
rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
send_action_failed(StreamId, starting, Args0)
catch _:Err ->
rabbit_log:warning("~s: failed to start
writer ~s ~w Error: ~w",
[?MODULE, StreamId, Node, Err]),
rabbit_log:warning("~s: failed to start writer ~s on ~s in ~b Error: ~w",
[?MODULE, StreamId, Node, Epoch, Err]),
send_action_failed(StreamId, starting, Args0)
end
end.
Expand All @@ -764,14 +762,13 @@ phase_update_retention(StreamId, #{pid := Pid,
try osiris:update_retention(Pid, Retention) of
ok ->
send_self_command({retention_updated, StreamId, Args});
{error, Err} ->
rabbit_log:warning("~s: failed to update
retention for ~s ~w Error: ~w",
[?MODULE, StreamId, node(Pid), Err]),
{error, Reason} = Err ->
rabbit_log:warning("~s: failed to update retention for ~s ~w Reason: ~w",
[?MODULE, StreamId, node(Pid), Reason]),
maybe_sleep(Err),
send_action_failed(StreamId, update_retention, Args)
catch _:Err ->
rabbit_log:warning("~s: failed to update
retention for ~s ~w Error: ~w",
rabbit_log:warning("~s: failed to update retention for ~s ~w Error: ~w",
[?MODULE, StreamId, node(Pid), Err]),
maybe_sleep(Err),
send_action_failed(StreamId, update_retention, Args)
Expand All @@ -782,6 +779,8 @@ get_replica_tail(Node, Conf) ->
case rpc:call(Node, ?MODULE, log_overview, [Conf]) of
{badrpc, nodedown} ->
{error, nodedown};
{error, _} = Err ->
Err;
{_Range, Offsets} ->
{ok, select_highest_offset(Offsets)}
end.
Expand All @@ -792,8 +791,13 @@ select_highest_offset(Offsets) ->
lists:last(Offsets).

log_overview(Config) ->
Dir = osiris_log:directory(Config),
osiris_log:overview(Dir).
case whereis(osiris_sup) of
undefined ->
{error, app_not_running};
_ ->
Dir = osiris_log:directory(Config),
osiris_log:overview(Dir)
end.


replay(L) when is_list(L) ->
Expand Down Expand Up @@ -864,7 +868,7 @@ filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamI
case maps:size(Members) =< 1 of
true ->
rabbit_log:warning(
"~s failed to delete replica on node ~p for stream ~s: refusing to delete the only replica",
"~s failed to delete replica on node ~s for stream ~s: refusing to delete the only replica",
[?MODULE, Node, StreamId]),
{error, last_stream_member};
false ->
Expand All @@ -879,8 +883,8 @@ update_stream(Meta, Cmd, Stream) ->
catch
_:E:Stacktrace ->
rabbit_log:warning(
"~s failed to update stream:~n~p~n~p",
[?MODULE, E, Stacktrace]),
"~s failed to update stream:~n~W~n~W",
[?MODULE, E, 10, Stacktrace, 10]),
Stream
end.

Expand Down Expand Up @@ -1506,9 +1510,13 @@ select_leader(Offsets) ->
Node.

maybe_sleep({{nodedown, _}, _}) ->
timer:sleep(5000);
timer:sleep(10000);
maybe_sleep({noproc, _}) ->
timer:sleep(5000);
maybe_sleep({error, nodedown}) ->
timer:sleep(5000);
maybe_sleep({error, _}) ->
timer:sleep(5000);
maybe_sleep(_) ->
ok.

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.hrl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

-define(STREAM_COORDINATOR_STARTUP, {stream_coordinator_startup, self()}).
-define(TICK_TIMEOUT, 1000).
-define(TICK_TIMEOUT, 30000).
-define(RESTART_TIMEOUT, 1000).
-define(PHASE_RETRY_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 30000).
Expand Down
39 changes: 23 additions & 16 deletions deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1440,29 +1440,36 @@ replica_recovery(Config) ->
publish_confirm(Ch1, Q, [<<"msg1">> || _ <- lists:seq(1, 100)]),
amqp_channel:close(Ch1),

CheckReplicaRecovered =
fun(DownNode) ->
rabbit_ct_helpers:await_condition(
fun () ->
timer:sleep(1000),
ct:pal("Wait for replica to recover..."),
try
{Conn, Ch2} = rabbit_ct_client_helpers:open_connection_and_channel(Config, DownNode),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
amqp_connection:close(Conn),
true
catch _:_ ->
false
end
end, 30000)
end,

[begin
[DownNode | _] = PNodes,
rabbit_control_helper:command(stop_app, DownNode),
rabbit_control_helper:command(start_app, DownNode),
timer:sleep(6000),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, DownNode),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
amqp_channel:close(Ch2)
end || PNodes <- permute(Nodes)],
CheckReplicaRecovered(DownNode)
end || [DownNode | _] <- permute(Nodes)],

[begin
[DownNode | _] = PNodes,
ok = rabbit_ct_broker_helpers:stop_node(Config, DownNode),
ok = rabbit_ct_broker_helpers:start_node(Config, DownNode),
timer:sleep(6000),
Ch2 = rabbit_ct_client_helpers:open_channel(Config, DownNode),
qos(Ch2, 10, false),
subscribe(Ch2, Q, false, 0),
receive_batch(Ch2, 0, 99),
amqp_channel:close(Ch2)
end || PNodes <- permute(Nodes)],
CheckReplicaRecovered(DownNode)
end || [DownNode | _] <- permute(Nodes)],
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

leader_failover(Config) ->
Expand Down
Loading