Skip to content

Commit b704d8d

Browse files
the-mikedavisdcorbacho
authored andcommitted
Restore 'rabbit_khepri:wait_for_leader/0'
This is almost the same as the prior 'wait_for_leader/0' function except that we use 'khepri:exists/3' with the 'compromise' favor rather than 'ra_leaderboard:lookup_leader/1' to check if the cluster has a leader. The compromise favor either performs a leader or consistent query (which also goes through the leader) depending on whether a leader has been cached recently, so the query will block until a leader can be elected and works through any pending consistent queries. Elections run asynchronously after cluster start and may be slower than the call to 'ra_leaderboard', so for large values of 'khepri_leader_wait_retry_timeout' the server have take a noticeable pause on boot while waiting to poll 'ra_leaderboard'. Queries return soon after the election which minimizes this pause.
1 parent 6a58d4a commit b704d8d

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ setup(_) ->
126126
friendly_name => ?RA_FRIENDLY_NAME},
127127
case khepri:start(?RA_SYSTEM, RaServerConfig) of
128128
{ok, ?STORE_ID} ->
129+
wait_for_leader(),
129130
register_projections(),
130131
?LOG_DEBUG(
131132
"Khepri-based " ?RA_FRIENDLY_NAME " ready",
@@ -135,6 +136,38 @@ setup(_) ->
135136
exit(Error)
136137
end.
137138

139+
wait_for_leader() ->
140+
wait_for_leader(retry_timeout(), retry_limit()).
141+
142+
retry_timeout() ->
143+
case application:get_env(rabbit, khepri_leader_wait_retry_timeout) of
144+
{ok, T} -> T;
145+
undefined -> 30000
146+
end.
147+
148+
retry_limit() ->
149+
case application:get_env(rabbit, khepri_leader_wait_retry_limit) of
150+
{ok, T} -> T;
151+
undefined -> 10
152+
end.
153+
154+
wait_for_leader(_Timeout, 0) ->
155+
exit(timeout_waiting_for_leader);
156+
wait_for_leader(Timeout, Retries) ->
157+
rabbit_log:info("Waiting for Khepri leader for ~tp ms, ~tp retries left",
158+
[Timeout, Retries - 1]),
159+
Options = #{timeout => Timeout,
160+
favor => compromise},
161+
case khepri:exists(?STORE_ID, [], Options) of
162+
Exists when is_boolean(Exists) ->
163+
rabbit_log:info("Khepri leader elected"),
164+
ok;
165+
{error, {timeout, _ServerId}} ->
166+
wait_for_leader(Timeout, Retries -1);
167+
{error, Reason} ->
168+
throw(Reason)
169+
end.
170+
138171
add_member(JoiningNode, JoinedNode)
139172
when JoiningNode =:= node() andalso is_atom(JoinedNode) ->
140173
Ret = do_join(JoinedNode),

deps/rabbit/test/clustering_management_SUITE.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,12 @@ reset_in_minority(Config) ->
617617

618618
rabbit_ct_broker_helpers:stop_node(Config, Hare),
619619

620+
ok = rpc:call(Rabbit, application, set_env,
621+
[rabbit, khepri_leader_wait_retry_timeout, 1000]),
622+
ok = rpc:call(Rabbit, application, set_env,
623+
[rabbit, khepri_leader_wait_retry_limit, 3]),
620624
stop_app(Rabbit),
621-
?assertMatch({error, 75, _}, reset(Rabbit)),
625+
?assertMatch({error, 69, _}, reset(Rabbit)),
622626

623627
ok.
624628

0 commit comments

Comments
 (0)