21
21
22
22
-define (SERVER , ? MODULE ).
23
23
-define (DEFAULT_INTERVAL , 60_000 * 60 ).
24
- -define (SHORT_INTERVAL , 10_000 ).
24
+ -define (DEFAULT_TRIGGER_INTERVAL , 10_000 ).
25
25
-define (QUEUE_COUNT_START_RANDOM_SELECTION , 1_000 ).
26
26
27
27
-define (EVAL_MSG , membership_reconciliation ).
28
28
29
29
-record (state , {timer_ref :: reference () | undefined ,
30
- default_interval :: non_neg_integer (),
31
- short_interval :: non_neg_integer (),
30
+ interval :: non_neg_integer (),
31
+ trigger_interval :: non_neg_integer (),
32
32
target_group_size :: non_neg_integer () | undefined ,
33
- enabled :: boolean ()}).
33
+ enabled :: boolean (),
34
+ auto_remove :: boolean ()}).
34
35
35
36
% %----------------------------------------------------------------------------
36
37
% % Start
@@ -62,19 +63,22 @@ policy_set() ->
62
63
init ([]) ->
63
64
Enabled = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_enabled ,
64
65
false ),
65
- DefaultInterval = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_default_timeout ,
66
+ AutoRemove = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_auto_remove ,
67
+ false ),
68
+ Interval = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_interval ,
66
69
? DEFAULT_INTERVAL ),
67
- ShortInterval = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_short_timeout ,
68
- ? SHORT_INTERVAL ),
70
+ TriggerInterval = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_trigger_interval ,
71
+ ? DEFAULT_TRIGGER_INTERVAL ),
69
72
TargetGroupSize = rabbit_misc :get_env (rabbit , quorum_membership_reconciliation_target_group_size ,
70
73
undefined ),
71
- State = # state {default_interval = DefaultInterval ,
72
- short_interval = ShortInterval ,
74
+ State = # state {interval = Interval ,
75
+ trigger_interval = TriggerInterval ,
73
76
target_group_size = TargetGroupSize ,
74
- enabled = Enabled },
77
+ enabled = Enabled ,
78
+ auto_remove = AutoRemove },
75
79
case Enabled of
76
80
true ->
77
- Ref = erlang :send_after (DefaultInterval , self (), ? EVAL_MSG ),
81
+ Ref = erlang :send_after (Interval , self (), ? EVAL_MSG ),
78
82
{ok , State # state {timer_ref = Ref }};
79
83
false ->
80
84
{ok , State , hibernate }
@@ -86,7 +90,7 @@ handle_call(_Request, _From, State) ->
86
90
handle_cast ({membership_reconciliation_trigger , _Reason }, # state {enabled = false } = State ) ->
87
91
{noreply , State , hibernate };
88
92
handle_cast ({membership_reconciliation_trigger , Reason }, # state {timer_ref = OldRef ,
89
- short_interval = Time } = State ) ->
93
+ trigger_interval = Time } = State ) ->
90
94
rabbit_log :debug (" Quorum Queue membership reconciliation triggered: ~p " ,
91
95
[Reason ]),
92
96
_ = erlang :cancel_timer (OldRef ),
@@ -95,15 +99,14 @@ handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldR
95
99
handle_cast (_Msg , State ) ->
96
100
{noreply , State }.
97
101
98
- handle_info (? EVAL_MSG , # state {default_interval = DefaultInterval ,
99
- short_interval = ShortInterval ,
100
- target_group_size = TargetSize } = State ) ->
101
- Res = reconclitiate_quorum_queue_membership (TargetSize ),
102
+ handle_info (? EVAL_MSG , # state {interval = Interval ,
103
+ trigger_interval = TriggerInterval } = State ) ->
104
+ Res = reconclitiate_quorum_queue_membership (State ),
102
105
NewTimeout = case Res of
103
106
noop ->
104
- DefaultInterval ;
107
+ Interval ;
105
108
_ ->
106
- ShortInterval
109
+ TriggerInterval
107
110
end ,
108
111
Ref = erlang :send_after (NewTimeout , self (), ? EVAL_MSG ),
109
112
{noreply , State # state {timer_ref = Ref }};
@@ -122,15 +125,17 @@ code_change(_OldVsn, State, _Extra) ->
122
125
% % Internal functions
123
126
% %----------------------------------------------------------------------------
124
127
125
- reconclitiate_quorum_queue_membership (TargetSize ) ->
128
+ reconclitiate_quorum_queue_membership (State ) ->
126
129
LocalLeaders = rabbit_amqqueue :list_local_leaders (),
127
130
ExpectedNodes = rabbit_nodes :list_members (),
128
131
Running = rabbit_nodes :list_running (),
129
- reconclitiate_quorum_members (ExpectedNodes , Running , LocalLeaders , TargetSize , noop ).
132
+ reconclitiate_quorum_members (ExpectedNodes , Running , LocalLeaders , State , noop ).
130
133
131
- reconclitiate_quorum_members (_ExpectedNodes , _Running , [], _TargetSize , Result ) ->
134
+ reconclitiate_quorum_members (_ExpectedNodes , _Running , [], _State , Result ) ->
132
135
Result ;
133
- reconclitiate_quorum_members (ExpectedNodes , Running , [Q | LocalLeaders ], TargetSize , OldResult ) ->
136
+ reconclitiate_quorum_members (ExpectedNodes , Running , [Q | LocalLeaders ],
137
+ # state {target_group_size = TargetSize } = State ,
138
+ OldResult ) ->
134
139
Result =
135
140
maybe
136
141
{ok , Members , {_ , LeaderNode }} = ra :members (amqqueue :get_pid (Q ), 500 ),
@@ -139,12 +144,12 @@ reconclitiate_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders], TargetS
139
144
% % And that this not is not in maintenance mode
140
145
true ?= not rabbit_maintenance :is_being_drained_local_read (node ()),
141
146
MemberNodes = [Node || {_ , Node } <- Members ],
142
- Remove = MemberNodes -- ExpectedNodes ,
143
- case Remove of
144
- [] ->
147
+ DanglingNodes = MemberNodes -- ExpectedNodes ,
148
+ case maybe_remove ( DanglingNodes , State ) of
149
+ false ->
145
150
maybe_add_member (Q , Running , MemberNodes , get_target_size (Q , TargetSize ));
146
- _ ->
147
- remove_members (Q , Remove )
151
+ true ->
152
+ remove_members (Q , DanglingNodes )
148
153
end
149
154
else
150
155
{timeout , Reason } ->
@@ -153,9 +158,16 @@ reconclitiate_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders], TargetS
153
158
_ ->
154
159
noop
155
160
end ,
156
- reconclitiate_quorum_members (ExpectedNodes , Running , LocalLeaders , TargetSize ,
161
+ reconclitiate_quorum_members (ExpectedNodes , Running , LocalLeaders , State ,
157
162
update_result (OldResult , Result )).
158
163
164
+ maybe_remove (_ , # state {auto_remove = false }) ->
165
+ false ;
166
+ maybe_remove ([], # state {auto_remove = true }) ->
167
+ false ;
168
+ maybe_remove (_Nodes , # state {auto_remove = true }) ->
169
+ true .
170
+
159
171
maybe_add_member (Q , Running , MemberNodes , TargetSize ) ->
160
172
% % Filter out any new nodes under maintenance
161
173
New = rabbit_maintenance :filter_out_drained_nodes_local_read (Running -- MemberNodes ),
0 commit comments