Skip to content

Commit 93b2b43

Browse files
committed
rabbit_khepri: Remove the ?wait() feature
It is handled by Khepri now.
1 parent 01000ea commit 93b2b43

File tree

1 file changed

+16
-46
lines changed

1 file changed

+16
-46
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 16 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -96,36 +96,6 @@ setup(_) ->
9696
exit(Error)
9797
end.
9898

99-
%% TODO: Move this logic to Khepri itself, it could be useful to any user,
100-
%% right?
101-
-define(wait(Code), wait_for_leader(fun() -> Code end, 20000)).
102-
103-
wait_for_leader(Fun, Timeout) ->
104-
T0 = erlang:monotonic_time(),
105-
case Fun() of
106-
{ok, _} = Ret->
107-
Ret;
108-
{error, ra_leader_unknown} when Timeout >= 0 ->
109-
?LOG_INFO(
110-
"Waiting for " ?RA_FRIENDLY_NAME " leader to be elected "
111-
"for ~b milliseconds before retrying Khepri call",
112-
[Timeout],
113-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
114-
timer:sleep(500),
115-
T1 = erlang:monotonic_time(),
116-
TDiff = erlang:convert_time_unit(T1 - T0, native, millisecond),
117-
TimeLeft = Timeout - TDiff,
118-
wait_for_leader(Fun, TimeLeft);
119-
{error, ra_leader_unknown} = Error ->
120-
?LOG_ERROR(
121-
"Khepri call imeout while waiting for " ?RA_FRIENDLY_NAME " "
122-
"leader to be elected",
123-
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
124-
Error;
125-
Error ->
126-
Error
127-
end.
128-
12999
add_member(JoiningNode, JoinedNode)
130100
when JoiningNode =:= node() andalso is_atom(JoinedNode) ->
131101
Ret = do_join(JoinedNode),
@@ -327,13 +297,13 @@ dir() ->
327297
%% They are some additional functions too, because they are useful in
328298
%% RabbitMQ. They might be moved to Khepri in the future.
329299

330-
create(Path, Data) -> ?wait(khepri:create(?STORE_ID, Path, Data)).
331-
update(Path, Data) -> ?wait(khepri:update(?STORE_ID, Path, Data)).
300+
create(Path, Data) -> khepri:create(?STORE_ID, Path, Data).
301+
update(Path, Data) -> khepri:update(?STORE_ID, Path, Data).
332302
cas(Path, Pattern, Data) ->
333-
?wait(khepri:compare_and_swap(?STORE_ID, Path, Pattern, Data)).
303+
khepri:compare_and_swap(?STORE_ID, Path, Pattern, Data).
334304

335305
get(Path) ->
336-
case ?wait(khepri:get(?STORE_ID, Path, #{expect_specific_node => true})) of
306+
case khepri:get(?STORE_ID, Path, #{expect_specific_node => true}) of
337307
{ok, Result} ->
338308
[PropsAndData] = maps:values(Result),
339309
{ok, PropsAndData};
@@ -348,7 +318,7 @@ get_data(Path) ->
348318
Error -> Error
349319
end.
350320

351-
match(Path) -> ?wait(khepri:get(?STORE_ID, Path)).
321+
match(Path) -> khepri:get(?STORE_ID, Path).
352322

353323
match_and_get_data(Path) ->
354324
Ret = match(Path),
@@ -358,15 +328,15 @@ tx_match_and_get_data(Path) ->
358328
Ret = khepri_tx:get(Path),
359329
keep_data_only_in_result(Ret).
360330

361-
exists(Path) -> ?wait(khepri:exists(?STORE_ID, Path)).
362-
find(Path, Condition) -> ?wait(khepri:find(?STORE_ID, Path, Condition)).
331+
exists(Path) -> khepri:exists(?STORE_ID, Path).
332+
find(Path, Condition) -> khepri:find(?STORE_ID, Path, Condition).
363333

364-
list(Path) -> ?wait(khepri:list(?STORE_ID, Path)).
334+
list(Path) -> khepri:list(?STORE_ID, Path).
365335

366336
list_child_nodes(Path) ->
367337
Options = #{expect_specific_node => true,
368338
include_child_names => true},
369-
case ?wait(khepri:get(?STORE_ID, Path, Options)) of
339+
case khepri:get(?STORE_ID, Path, Options) of
370340
{ok, Result} ->
371341
[#{child_names := ChildNames}] = maps:values(Result),
372342
{ok, ChildNames};
@@ -391,8 +361,8 @@ keep_data_only_in_result({ok, Result}) ->
391361
keep_data_only_in_result(Error) ->
392362
Error.
393363

394-
clear_payload(Path) -> ?wait(khepri:clear_payload(?STORE_ID, Path)).
395-
delete(Path) -> ?wait(khepri:delete(?STORE_ID, Path)).
364+
clear_payload(Path) -> khepri:clear_payload(?STORE_ID, Path).
365+
delete(Path) -> khepri:delete(?STORE_ID, Path).
396366

397367
delete_or_fail(Path) ->
398368
case khepri:delete(?STORE_ID, Path) of
@@ -417,13 +387,13 @@ transaction(Fun) ->
417387
transaction(Fun, auto).
418388

419389
transaction(Fun, ReadWrite) ->
420-
?wait(case khepri:transaction(?STORE_ID, Fun, ReadWrite) of
421-
{atomic, Result} -> Result;
422-
{aborted, Reason} -> throw({error, Reason})
423-
end).
390+
case khepri:transaction(?STORE_ID, Fun, ReadWrite) of
391+
{atomic, Result} -> Result;
392+
{aborted, Reason} -> throw({error, Reason})
393+
end.
424394

425395
clear_store() ->
426-
?wait(khepri:clear_store(?STORE_ID)).
396+
khepri:clear_store(?STORE_ID).
427397

428398
info() ->
429399
ok = setup(),

0 commit comments

Comments
 (0)