26
26
normalize /1 ,
27
27
append_node_prefix /1 ,
28
28
node_prefix /0 ]).
29
- -export ([do_query_node_props /1 ,
30
- group_leader_proxy /2 ]).
29
+ -export ([do_query_node_props /2 ]).
31
30
32
31
-ifdef (TEST ).
33
32
-export ([query_node_props /1 ,
@@ -378,7 +377,8 @@ check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType)
378
377
% % @private
379
378
380
379
query_node_props (Nodes ) when Nodes =/= [] ->
381
- {Prefix , Suffix } = rabbit_nodes_common :parts (node ()),
380
+ ThisNode = node (),
381
+ {Prefix , Suffix } = rabbit_nodes_common :parts (ThisNode ),
382
382
PeerName = peer :random_name (Prefix ),
383
383
% % We go through a temporary hidden node to query all other discovered
384
384
% % peers properties, instead of querying them directly.
@@ -440,7 +440,7 @@ query_node_props(Nodes) when Nodes =/= [] ->
440
440
[Peer ],
441
441
#{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
442
442
try
443
- peer :call (Pid , ? MODULE , do_query_node_props , [Nodes ], 180000 )
443
+ peer :call (Pid , ? MODULE , do_query_node_props , [Nodes , ThisNode ], 180000 )
444
444
after
445
445
peer :stop (Pid )
446
446
end ;
@@ -563,80 +563,25 @@ maybe_add_tls_arguments(VMArgs) ->
563
563
end ,
564
564
VMArgs2 .
565
565
566
- do_query_node_props (Nodes ) when Nodes =/= [] ->
566
+ do_query_node_props (Nodes , ThisNode ) when Nodes =/= [] ->
567
567
% % Make sure all log messages are forwarded from this temporary hidden
568
568
% % node to the upstream node, regardless of their level.
569
569
_ = logger :set_primary_config (level , debug ),
570
570
571
- % % The group leader for all processes on this temporary hidden node is the
572
- % % calling process' group leader on the upstream node.
573
- % %
574
- % % When we use `erpc:call/4' (or the multicall equivalent) to execute code
575
- % % on one of the `Nodes', the remotely executed code will also use the
576
- % % calling process' group leader by default.
577
- % %
578
- % % We use this temporary hidden node to ensure the downstream node will
579
- % % not connected to the upstream node. Therefore, we must change the group
580
- % % leader as well, otherwise any I/O from the downstream node will send a
581
- % % message to the upstream node's group leader and thus open a connection.
582
- % % This would defeat the entire purpose of this temporary hidden node.
583
- % %
584
- % % To avoid this, we start a proxy process which we will use as a group
585
- % % leader. This process will send all messages it receives to the group
586
- % % leader on the upstream node.
587
- % %
588
- % % There is one caveat: the logger (local to the temporary hidden node)
589
- % % forwards log messages to the upstream logger (on the upstream node)
590
- % % only if the group leader of that message is a remote PID. Because we
591
- % % set a local PID, it stops forwarding log messages originating from that
592
- % % temporary hidden node. That's why we use `with_group_leader_proxy/2' to
593
- % % set the group leader to our proxy only around the use of `erpc'.
594
- % %
595
- % % That's a lot just to keep logging working while not reveal the upstream
596
- % % node to the downstream node...
597
- Parent = self (),
598
- UpstreamGroupLeader = erlang :group_leader (),
599
- ProxyGroupLeader = spawn_link (
600
- ? MODULE , group_leader_proxy ,
601
- [Parent , UpstreamGroupLeader ]),
602
-
603
571
% % TODO: Replace with `rabbit_nodes:list_members/0' when the oldest
604
572
% % supported version has it.
605
- MembersPerNode = with_group_leader_proxy (
606
- ProxyGroupLeader ,
607
- fun () ->
608
- erpc :multicall (Nodes , rabbit_nodes , all , [])
609
- end ),
610
- query_node_props1 (Nodes , MembersPerNode , [], ProxyGroupLeader ).
611
-
612
- with_group_leader_proxy (ProxyGroupLeader , Fun ) ->
613
- UpstreamGroupLeader = erlang :group_leader (),
614
- try
615
- true = erlang :group_leader (ProxyGroupLeader , self ()),
616
- Fun ()
617
- after
618
- true = erlang :group_leader (UpstreamGroupLeader , self ())
619
- end .
620
-
621
- group_leader_proxy (Parent , UpstreamGroupLeader ) ->
622
- receive
623
- stop_proxy ->
624
- erlang :unlink (Parent ),
625
- Parent ! proxy_stopped ;
626
- Message ->
627
- UpstreamGroupLeader ! Message ,
628
- group_leader_proxy (Parent , UpstreamGroupLeader )
629
- end .
573
+ MembersPerNode = erpc :multicall (Nodes , rabbit_nodes , all , []),
574
+ query_node_props1 (Nodes , MembersPerNode , [], ThisNode ).
630
575
631
576
query_node_props1 (
632
577
[Node | Nodes ], [{ok , Members } | MembersPerNode ], NodesAndProps ,
633
- ProxyGroupLeader ) ->
578
+ ThisNode ) ->
634
579
NodeAndProps = {Node , Members },
635
580
NodesAndProps1 = [NodeAndProps | NodesAndProps ],
636
- query_node_props1 (Nodes , MembersPerNode , NodesAndProps1 , ProxyGroupLeader );
581
+ query_node_props1 (Nodes , MembersPerNode , NodesAndProps1 , ThisNode );
637
582
query_node_props1 (
638
583
[Node | Nodes ], [{error , _ } = Error | MembersPerNode ], NodesAndProps ,
639
- ProxyGroupLeader ) ->
584
+ ThisNode ) ->
640
585
% % We consider that an error means the remote node is unreachable or not
641
586
% % ready. Therefore, we exclude it from the list of discovered nodes as we
642
587
% % won't be able to join it anyway.
@@ -645,22 +590,22 @@ query_node_props1(
645
590
" Peer discovery: node '~ts ' excluded from the discovered nodes" ,
646
591
[Node , Error , Node ],
647
592
#{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
648
- query_node_props1 (Nodes , MembersPerNode , NodesAndProps , ProxyGroupLeader );
649
- query_node_props1 ([], [], NodesAndProps , ProxyGroupLeader ) ->
593
+ query_node_props1 (Nodes , MembersPerNode , NodesAndProps , ThisNode );
594
+ query_node_props1 ([], [], NodesAndProps , ThisNode ) ->
650
595
NodesAndProps1 = lists :reverse (NodesAndProps ),
651
- query_node_props2 (NodesAndProps1 , [], ProxyGroupLeader ).
596
+ query_node_props2 (NodesAndProps1 , [], ThisNode ).
652
597
653
- query_node_props2 ([{Node , Members } | Rest ], NodesAndProps , ProxyGroupLeader ) ->
598
+ query_node_props2 ([{Node , Members } | Rest ], NodesAndProps , ThisNode ) ->
654
599
try
655
600
erpc :call (
656
601
Node , logger , debug ,
657
602
[" Peer discovery: temporary hidden node '~ts ' queries properties "
658
603
" from node '~ts '" , [node (), Node ]]),
659
- StartTime = get_node_start_time (Node , microsecond , ProxyGroupLeader ),
660
- IsReady = is_node_db_ready (Node , ProxyGroupLeader ),
604
+ StartTime = get_node_start_time (Node , microsecond ),
605
+ IsReady = is_node_db_ready (Node , ThisNode ),
661
606
NodeAndProps = {Node , Members , StartTime , IsReady },
662
607
NodesAndProps1 = [NodeAndProps | NodesAndProps ],
663
- query_node_props2 (Rest , NodesAndProps1 , ProxyGroupLeader )
608
+ query_node_props2 (Rest , NodesAndProps1 , ThisNode )
664
609
catch
665
610
_ :Error :_ ->
666
611
% % If one of the erpc calls we use to get the start time fails,
@@ -673,27 +618,18 @@ query_node_props2([{Node, Members} | Rest], NodesAndProps, ProxyGroupLeader) ->
673
618
" Peer discovery: node '~ts ' excluded from the discovered nodes" ,
674
619
[Node , Error , Node ],
675
620
#{domain => ? RMQLOG_DOMAIN_PEER_DISC }),
676
- query_node_props2 (Rest , NodesAndProps , ProxyGroupLeader )
621
+ query_node_props2 (Rest , NodesAndProps , ThisNode )
677
622
end ;
678
- query_node_props2 ([], NodesAndProps , ProxyGroupLeader ) ->
623
+ query_node_props2 ([], NodesAndProps , _ThisNode ) ->
679
624
NodesAndProps1 = lists :reverse (NodesAndProps ),
680
625
NodesAndProps2 = sort_nodes_and_props (NodesAndProps1 ),
681
- % % Wait for the proxy group leader to flush its inbox.
682
- ProxyGroupLeader ! stop_proxy ,
683
- receive
684
- proxy_stopped ->
685
- ok
686
- after 120_000 ->
687
- ok
688
- end ,
689
626
? assertEqual ([], nodes ()),
690
627
? assert (length (NodesAndProps2 ) =< length (nodes (hidden ))),
691
628
NodesAndProps2 .
692
629
693
- -spec get_node_start_time (Node , Unit , ProxyGroupLeader ) -> StartTime when
630
+ -spec get_node_start_time (Node , Unit ) -> StartTime when
694
631
Node :: node (),
695
632
Unit :: erlang :time_unit (),
696
- ProxyGroupLeader :: pid (),
697
633
StartTime :: non_neg_integer ().
698
634
% % @doc Returns the start time of the given `Node' in `Unit'.
699
635
% %
@@ -713,52 +649,35 @@ query_node_props2([], NodesAndProps, ProxyGroupLeader) ->
713
649
% %
714
650
% % @private
715
651
716
- get_node_start_time (Node , Unit , ProxyGroupLeader ) ->
717
- with_group_leader_proxy (
718
- ProxyGroupLeader ,
719
- fun () ->
720
- NativeStartTime = erpc :call (
721
- Node , erlang , system_info , [start_time ]),
722
- TimeOffset = erpc :call (Node , erlang , time_offset , []),
723
- SystemStartTime = NativeStartTime + TimeOffset ,
724
- StartTime = erpc :call (
725
- Node , erlang , convert_time_unit ,
726
- [SystemStartTime , native , Unit ]),
727
- StartTime
728
- end ).
729
-
730
- -spec is_node_db_ready (Node , ProxyGroupLeader ) -> IsReady when
652
+ get_node_start_time (Node , Unit ) ->
653
+ NativeStartTime = erpc :call (Node , erlang , system_info , [start_time ]),
654
+ TimeOffset = erpc :call (Node , erlang , time_offset , []),
655
+ SystemStartTime = NativeStartTime + TimeOffset ,
656
+ StartTime = erpc :call (
657
+ Node , erlang , convert_time_unit ,
658
+ [SystemStartTime , native , Unit ]),
659
+ StartTime .
660
+
661
+ -spec is_node_db_ready (Node , ThisNode ) -> IsReady when
731
662
Node :: node (),
732
- ProxyGroupLeader :: pid (),
663
+ ThisNode :: node (),
733
664
IsReady :: boolean () | undefined .
734
665
% % @doc Returns if the node's DB layer is ready or not.
735
666
% %
736
667
% % @private
737
668
738
- is_node_db_ready (Node , ProxyGroupLeader ) ->
739
- % % This code is running from a temporary hidden node. We derive the real
740
- % % node interested in the properties from the group leader.
741
- UpstreamGroupLeader = erlang :group_leader (),
742
- ThisNode = node (UpstreamGroupLeader ),
743
- case Node of
744
- ThisNode ->
745
- % % The current node is running peer discovery, thus way before we
746
- % % mark the DB layer as ready. Consider it ready in this case,
747
- % % otherwise if the current node is selected, it will loop forever
748
- % % waiting for itself to be ready.
749
- true ;
750
- _ ->
751
- with_group_leader_proxy (
752
- ProxyGroupLeader ,
753
- fun () ->
754
- try
755
- erpc :call (Node , rabbit_db , is_init_finished , [])
756
- catch
757
- _ :{exception , undef ,
758
- [{rabbit_db , is_init_finished , _ , _ } | _ ]} ->
759
- undefined
760
- end
761
- end )
669
+ is_node_db_ready (ThisNode , ThisNode ) ->
670
+ % % The current node is running peer discovery, thus way before we mark the
671
+ % % DB layer as ready. Consider it ready in this case, otherwise if the
672
+ % % current node is selected, it will loop forever waiting for itself to be
673
+ % % ready.
674
+ true ;
675
+ is_node_db_ready (Node , _ThisNode ) ->
676
+ try
677
+ erpc :call (Node , rabbit_db , is_init_finished , [])
678
+ catch
679
+ _ :{exception , undef , [{rabbit_db , is_init_finished , _ , _ } | _ ]} ->
680
+ undefined
762
681
end .
763
682
764
683
-spec sort_nodes_and_props (NodesAndProps ) ->
0 commit comments