Skip to content

Commit f6314d0

Browse files
committed
rabbit_peer_discovery: Retry RPC calls
[Why] In CI, we observe some timeouts in the Erlang distribution connections between the temporary hidden node and the nodes it queries. This affects peer discovery obviously. [How] We introduce some query retries to reduce the risk of an incomplete query. While here, we move the sorting of queried nodes from the `query_node_props2/3` last clause (executed in the temporary hidden node) to the function setting the temporary hidden node and asking for these queries. This way the debug messages from that sorting are logged by RabbitMQ out of the box.
1 parent 4d4985f commit f6314d0

File tree

1 file changed

+73
-36
lines changed

1 file changed

+73
-36
lines changed

deps/rabbit/src/rabbit_peer_discovery.erl

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,12 @@ query_node_props(Nodes) when Nodes =/= [] ->
440440
[Peer],
441441
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
442442
try
443-
peer:call(Pid, ?MODULE, do_query_node_props, [Nodes, ThisNode], 180000)
443+
NodesAndProps1 = peer:call(
444+
Pid,
445+
?MODULE, do_query_node_props,
446+
[Nodes, ThisNode], 180000),
447+
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
448+
NodesAndProps2
444449
after
445450
peer:stop(Pid)
446451
end;
@@ -563,25 +568,31 @@ maybe_add_tls_arguments(VMArgs) ->
563568
end,
564569
VMArgs2.
565570

566-
do_query_node_props(Nodes, ThisNode) when Nodes =/= [] ->
571+
do_query_node_props(Nodes, FromNode) when Nodes =/= [] ->
567572
%% Make sure all log messages are forwarded from this temporary hidden
568573
%% node to the upstream node, regardless of their level.
569574
_ = logger:set_primary_config(level, debug),
570575

571576
%% TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
572577
%% supported version has it.
573-
MembersPerNode = erpc:multicall(Nodes, rabbit_nodes, all, []),
574-
query_node_props1(Nodes, MembersPerNode, [], ThisNode).
578+
MembersPerNode = [try
579+
{ok,
580+
erpc_call(Node, rabbit_nodes, all, [], FromNode)}
581+
catch
582+
Class:Reason ->
583+
{Class, Reason}
584+
end || Node <- Nodes],
585+
query_node_props1(Nodes, MembersPerNode, [], FromNode).
575586

576587
query_node_props1(
577588
[Node | Nodes], [{ok, Members} | MembersPerNode], NodesAndProps,
578-
ThisNode) ->
589+
FromNode) ->
579590
NodeAndProps = {Node, Members},
580591
NodesAndProps1 = [NodeAndProps | NodesAndProps],
581-
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, ThisNode);
592+
query_node_props1(Nodes, MembersPerNode, NodesAndProps1, FromNode);
582593
query_node_props1(
583-
[Node | Nodes], [{error, _} = Error | MembersPerNode], NodesAndProps,
584-
ThisNode) ->
594+
[Node | Nodes], [{_, _} = Error | MembersPerNode], NodesAndProps,
595+
FromNode) ->
585596
%% We consider that an error means the remote node is unreachable or not
586597
%% ready. Therefore, we exclude it from the list of discovered nodes as we
587598
%% won't be able to join it anyway.
@@ -590,20 +601,21 @@ query_node_props1(
590601
"Peer discovery: node '~ts' excluded from the discovered nodes",
591602
[Node, Error, Node],
592603
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
593-
query_node_props1(Nodes, MembersPerNode, NodesAndProps, ThisNode);
594-
query_node_props1([], [], NodesAndProps, ThisNode) ->
604+
query_node_props1(Nodes, MembersPerNode, NodesAndProps, FromNode);
605+
query_node_props1([], [], NodesAndProps, FromNode) ->
595606
NodesAndProps1 = lists:reverse(NodesAndProps),
596-
query_node_props2(NodesAndProps1, [], ThisNode).
607+
query_node_props2(NodesAndProps1, [], FromNode).
597608

598-
query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) ->
609+
query_node_props2([{Node, Members} | Rest], NodesAndProps, FromNode) ->
599610
NodesAndProps2 = try
600-
erpc:call(
611+
erpc_call(
601612
Node, logger, debug,
602613
["Peer discovery: temporary hidden node '~ts' "
603614
"queries properties from node '~ts'",
604-
[node(), Node]]),
605-
StartTime = get_node_start_time(Node, microsecond),
606-
IsReady = is_node_db_ready(Node, ThisNode),
615+
[node(), Node]], FromNode),
616+
StartTime = get_node_start_time(
617+
Node, microsecond, FromNode),
618+
IsReady = is_node_db_ready(Node, FromNode),
607619
NodeAndProps = {Node, Members, StartTime, IsReady},
608620
NodesAndProps1 = [NodeAndProps | NodesAndProps],
609621
NodesAndProps1
@@ -623,17 +635,17 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ThisNode) ->
623635
#{domain => ?RMQLOG_DOMAIN_PEER_DISC}),
624636
NodesAndProps
625637
end,
626-
query_node_props2(Rest, NodesAndProps2, ThisNode);
627-
query_node_props2([], NodesAndProps, _ThisNode) ->
638+
query_node_props2(Rest, NodesAndProps2, FromNode);
639+
query_node_props2([], NodesAndProps, _FromNode) ->
628640
NodesAndProps1 = lists:reverse(NodesAndProps),
629-
NodesAndProps2 = sort_nodes_and_props(NodesAndProps1),
630641
?assertEqual([], nodes()),
631-
?assert(length(NodesAndProps2) =< length(nodes(hidden))),
632-
NodesAndProps2.
642+
?assert(length(NodesAndProps1) =< length(nodes(hidden))),
643+
NodesAndProps1.
633644

634-
-spec get_node_start_time(Node, Unit) -> StartTime when
645+
-spec get_node_start_time(Node, Unit, FromNode) -> StartTime when
635646
Node :: node(),
636647
Unit :: erlang:time_unit(),
648+
FromNode :: node(),
637649
StartTime :: non_neg_integer().
638650
%% @doc Returns the start time of the given `Node' in `Unit'.
639651
%%
@@ -653,37 +665,62 @@ query_node_props2([], NodesAndProps, _ThisNode) ->
653665
%%
654666
%% @private
655667

656-
get_node_start_time(Node, Unit) ->
657-
NativeStartTime = erpc:call(Node, erlang, system_info, [start_time]),
658-
TimeOffset = erpc:call(Node, erlang, time_offset, []),
668+
get_node_start_time(Node, Unit, FromNode) ->
669+
NativeStartTime = erpc_call(
670+
Node, erlang, system_info, [start_time], FromNode),
671+
TimeOffset = erpc_call(Node, erlang, time_offset, [], FromNode),
659672
SystemStartTime = NativeStartTime + TimeOffset,
660-
StartTime = erpc:call(
673+
StartTime = erpc_call(
661674
Node, erlang, convert_time_unit,
662-
[SystemStartTime, native, Unit]),
675+
[SystemStartTime, native, Unit], FromNode),
663676
StartTime.
664677

665-
-spec is_node_db_ready(Node, ThisNode) -> IsReady when
678+
-spec is_node_db_ready(Node, FromNode) -> IsReady when
666679
Node :: node(),
667-
ThisNode :: node(),
680+
FromNode :: node(),
668681
IsReady :: boolean() | undefined.
669682
%% @doc Returns if the node's DB layer is ready or not.
670683
%%
671684
%% @private
672685

673-
is_node_db_ready(ThisNode, ThisNode) ->
674-
%% The current node is running peer discovery, thus way before we mark the
675-
%% DB layer as ready. Consider it ready in this case, otherwise if the
676-
%% current node is selected, it will loop forever waiting for itself to be
677-
%% ready.
686+
is_node_db_ready(FromNode, FromNode) ->
687+
%% The function is called for rhe current node running peer discovery, thus
688+
%% way before we mark the DB layer as ready. Consider it ready in this
689+
%% case, otherwise if the current node is selected, it will loop forever
690+
%% waiting for itself to be ready.
678691
true;
679-
is_node_db_ready(Node, _ThisNode) ->
692+
is_node_db_ready(Node, FromNode) ->
680693
try
681-
erpc:call(Node, rabbit_db, is_init_finished, [])
694+
erpc_call(Node, rabbit_db, is_init_finished, [], FromNode)
682695
catch
683696
_:{exception, undef, [{rabbit_db, is_init_finished, _, _} | _]} ->
684697
undefined
685698
end.
686699

700+
erpc_call(Node, Mod, Fun, Args, FromNode) ->
701+
erpc_call(Node, Mod, Fun, Args, FromNode, 10000).
702+
703+
erpc_call(Node, Mod, Fun, Args, FromNode, Timeout) when Timeout >= 0 ->
704+
try
705+
erpc:call(Node, Mod, Fun, Args)
706+
catch
707+
error:{erpc, _} = Reason:Stacktrace ->
708+
Peer = node(),
709+
_ = catch erpc:call(
710+
FromNode,
711+
logger, debug,
712+
["Peer discovery: temporary hidden node '~ts' "
713+
"failed to connect to '~ts': ~0p",
714+
[Peer, Node, Reason]]),
715+
Sleep = 1000,
716+
timer:sleep(Sleep),
717+
NewTimeout = Timeout - Sleep,
718+
case NewTimeout >= 0 of
719+
true -> erpc_call(Node, Mod, Fun, Args, FromNode, NewTimeout);
720+
false -> erlang:raise(error, Reason, Stacktrace)
721+
end
722+
end.
723+
687724
-spec sort_nodes_and_props(NodesAndProps) ->
688725
SortedNodesAndProps when
689726
NodesAndProps :: [node_and_props()],

0 commit comments

Comments
 (0)