Skip to content

Commit accd496

Browse files
committed
Bugfixes
1 parent f8ba64b commit accd496

File tree

6 files changed

+16
-75
lines changed

6 files changed

+16
-75
lines changed

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
825825

826826
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
827827
Path = khepri_routes_path() ++ [VHost, Name],
828-
{ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_db:if_has_data_wildcard()]),
828+
{ok, Bindings} = khepri_tx:get_many(Path ++ [rabbit_khepri:if_has_data_wildcard()]),
829829
%% ok = khepri_tx:delete(Path),
830830
maps:fold(fun(P, Set, Acc) ->
831831
%% TODO projections are not updated if we just use the previous

deps/rabbit/test/metadata_store_phase1_SUITE.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ setup_khepri(Config) ->
199199
Config.
200200

201201
setup_code_mocking(Config) ->
202-
%% Bypass rabbit_misc:execute_mnesia_transaction/1 (no worker_pool
202+
%% Bypass rabbit_mnesia:execute_mnesia_transaction/1 (no worker_pool
203203
%% configured in particular) but keep the behavior of throwing the error.
204204
meck:new(rabbit_misc, [passthrough, no_link]),
205205
meck:expect(
@@ -2656,15 +2656,15 @@ list_vhost_records(khepri) ->
26562656
lists:sort(rabbit_vhost:all_in_khepri()).
26572657

26582658
update_vhost(mnesia, VHostName, Fun) ->
2659-
rabbit_misc:execute_mnesia_transaction(
2659+
rabbit_mnesia:execute_mnesia_transaction(
26602660
fun() ->
26612661
rabbit_vhost:update_in_mnesia(VHostName, Fun)
26622662
end);
26632663
update_vhost(khepri, VHostName, Fun) ->
26642664
rabbit_vhost:update_in_khepri(VHostName, Fun).
26652665

26662666
update_vhost(mnesia, VHostName, Description, Tags) ->
2667-
rabbit_misc:execute_mnesia_transaction(
2667+
rabbit_mnesia:execute_mnesia_transaction(
26682668
fun() ->
26692669
rabbit_vhost:update_in_mnesia(VHostName, Description, Tags)
26702670
end);
@@ -2677,7 +2677,7 @@ vhost_info(khepri, VHostName) ->
26772677
rabbit_vhost:info_in_khepri(VHostName).
26782678

26792679
delete_vhost(mnesia, VHostName) ->
2680-
rabbit_misc:execute_mnesia_transaction(
2680+
rabbit_mnesia:execute_mnesia_transaction(
26812681
fun() ->
26822682
Fun = rabbit_vhost:with_in_mnesia(
26832683
VHostName,

deps/rabbitmq_consistent_hash_exchange/src/rabbit_db_ch_exchange.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ create(X) ->
5858
}).
5959

6060
create_in_mnesia(X) ->
61-
rabbit_misc:execute_mnesia_transaction(
61+
rabbit_mnesia:execute_mnesia_transaction(
6262
fun() -> create_in_mnesia_tx(X) end).
6363

6464
create_in_mnesia_tx(X) ->
@@ -90,7 +90,7 @@ create_binding(Src, Dst, Weight, UpdateFun) ->
9090
}).
9191

9292
create_binding_in_mnesia(Src, Dst, Weight, UpdateFun) ->
93-
rabbit_misc:execute_mnesia_transaction(
93+
rabbit_mnesia:execute_mnesia_transaction(
9494
fun() ->
9595
create_binding_in_mnesia_tx(Src, Dst, Weight, UpdateFun)
9696
end).
@@ -171,7 +171,7 @@ delete(XName) ->
171171
}).
172172

173173
delete_in_mnesia(XName) ->
174-
rabbit_misc:execute_mnesia_transaction(
174+
rabbit_mnesia:execute_mnesia_transaction(
175175
fun() ->
176176
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
177177
mnesia:delete({?HASH_RING_STATE_TABLE, XName})
@@ -187,7 +187,7 @@ delete_bindings(Bindings, DeleteFun) ->
187187
}).
188188

189189
delete_bindings_in_mnesia(Bindings, DeleteFun) ->
190-
rabbit_misc:execute_mnesia_transaction(
190+
rabbit_mnesia:execute_mnesia_transaction(
191191
fun() ->
192192
[delete_binding_in_mnesia(Binding, DeleteFun) || Binding <- Bindings]
193193
end).

deps/rabbitmq_consistent_hash_exchange/src/rabbit_exchange_type_consistent_hash.erl

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,7 @@
4848

4949
init() ->
5050
rabbit_db_ch_exchange:setup_schema(),
51-
<<<<<<< HEAD
5251
_ = recover(),
53-
=======
54-
recover(),
55-
>>>>>>> 7b61c2e402 (Khepri: The One Commit)
5652
ok.
5753

5854
info(_X) -> [].
@@ -173,7 +169,6 @@ add_binding(_Serial, _X, #binding{source = S, destination = D, key = K}) ->
173169
rabbit_log:debug("Consistent hashing exchange: adding binding from "
174170
"exchange ~s to destination ~s with routing key '~s'",
175171
[rabbit_misc:rs(S), rabbit_misc:rs(D), K])
176-
<<<<<<< HEAD
177172
end.
178173

179174
chx_hash_ring_update_fun(#chx_hash_ring{bucket_map = BM0,
@@ -216,59 +211,6 @@ ch_hash_ring_delete_fun(#chx_hash_ring{bucket_map = BM0,
216211
BucketsDownTheRing = maps:filter(fun (K, _) -> K > LastBucket end, BM0),
217212
UnchangedBuckets = maps:filter(fun (K, _) -> K < FirstBucket end, BM0),
218213

219-
%% final state with "down the ring" buckets updated
220-
NewBucketsDownTheRing = maps:fold(
221-
fun(K0, V, Acc) ->
222-
maps:put(K0 - N, V, Acc)
223-
end, #{}, BucketsDownTheRing),
224-
BM1 = maps:merge(UnchangedBuckets, NewBucketsDownTheRing),
225-
NextN = NexN0 - N,
226-
Chx0#chx_hash_ring{bucket_map = BM1,
227-
next_bucket_number = NextN}
228-
=======
229-
>>>>>>> 7b61c2e402 (Khepri: The One Commit)
230-
end.
231-
232-
chx_hash_ring_update_fun(#chx_hash_ring{bucket_map = BM0,
233-
next_bucket_number = NexN0} = Chx0,
234-
Dst, Weight) ->
235-
case map_has_value(BM0, Dst) of
236-
true ->
237-
already_exists;
238-
false ->
239-
NextN = NexN0 + Weight,
240-
%% hi/lo bucket counters are 0-based but weight is 1-based
241-
Range = lists:seq(NexN0, (NextN - 1)),
242-
BM = lists:foldl(fun(Key, Acc) ->
243-
maps:put(Key, Dst, Acc)
244-
end, BM0, Range),
245-
Chx0#chx_hash_ring{bucket_map = BM,
246-
next_bucket_number = NextN}
247-
end.
248-
249-
remove_bindings(_Serial, _X, Bindings) ->
250-
Ret = rabbit_db_ch_exchange:delete_bindings(Bindings, fun ch_hash_ring_delete_fun/2),
251-
[rabbit_log:warning("Can't remove binding: hash ring state for exchange ~s wasn't found",
252-
[rabbit_misc:rs(X)]) || {not_found, X} <- Ret],
253-
ok.
254-
255-
ch_hash_ring_delete_fun(#chx_hash_ring{bucket_map = BM0,
256-
next_bucket_number = NexN0} = Chx0,
257-
Dst) ->
258-
%% Buckets with lower numbers stay as is; buckets that
259-
%% belong to this binding are removed; buckets with
260-
%% greater numbers are updated (their numbers are adjusted downwards)
261-
BucketsOfThisBinding = maps:filter(fun (_K, V) -> V =:= Dst end, BM0),
262-
case maps:size(BucketsOfThisBinding) of
263-
0 ->
264-
not_found;
265-
N when N >= 1 ->
266-
KeysOfThisBinding = lists:usort(maps:keys(BucketsOfThisBinding)),
267-
LastBucket = lists:last(KeysOfThisBinding),
268-
FirstBucket = hd(KeysOfThisBinding),
269-
BucketsDownTheRing = maps:filter(fun (K, _) -> K > LastBucket end, BM0),
270-
UnchangedBuckets = maps:filter(fun (K, _) -> K < FirstBucket end, BM0),
271-
272214
%% final state with "down the ring" buckets updated
273215
NewBucketsDownTheRing = maps:fold(
274216
fun(K0, V, Acc) ->

deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ create_or_update(XName, BindingKeyAndFun, ErrorFun) ->
6969
}).
7070

7171
create_or_update_in_mnesia(XName, BindingKeyAndFun, ErrorFun) ->
72-
rabbit_misc:execute_mnesia_transaction(
72+
rabbit_mnesia:execute_mnesia_transaction(
7373
fun() ->
7474
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
7575
read_state_in_mnesia(XName, ErrorFun),
@@ -105,7 +105,7 @@ insert(XName, BFuns) ->
105105
}).
106106

107107
insert_in_mnesia(XName, BFuns) ->
108-
rabbit_misc:execute_mnesia_transaction(
108+
rabbit_mnesia:execute_mnesia_transaction(
109109
fun() ->
110110
write_state_fun_in_mnesia(XName, BFuns)
111111
end).
@@ -155,7 +155,7 @@ delete(XName) ->
155155
}).
156156

157157
delete_in_mnesia(XName) ->
158-
rabbit_misc:execute_mnesia_transaction(
158+
rabbit_mnesia:execute_mnesia_transaction(
159159
fun() -> mnesia:delete(?JMS_TOPIC_TABLE, XName, write) end).
160160

161161
delete_in_khepri(XName) ->
@@ -170,7 +170,7 @@ delete(XName, BindingKeys, ErrorFun) ->
170170
}).
171171

172172
delete_in_mnesia(XName, BindingKeys, ErrorFun) ->
173-
rabbit_misc:execute_mnesia_transaction(
173+
rabbit_mnesia:execute_mnesia_transaction(
174174
fun() ->
175175
#?JMS_TOPIC_RECORD{x_selector_funs = BindingFuns} =
176176
read_state_in_mnesia(XName, ErrorFun),

deps/rabbitmq_recent_history_exchange/src/rabbit_db_rh_exchange.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ get(XName) ->
6868
}).
6969

7070
get_in_mnesia(XName) ->
71-
rabbit_misc:execute_mnesia_transaction(
71+
rabbit_mnesia:execute_mnesia_transaction(
7272
fun() -> get_in_mnesia_tx(XName) end).
7373

7474
get_in_khepri(XName) ->
@@ -99,7 +99,7 @@ insert(XName, Message, Length) ->
9999
}).
100100

101101
insert_in_mnesia(XName, Message, Length) ->
102-
rabbit_misc:execute_mnesia_transaction(
102+
rabbit_mnesia:execute_mnesia_transaction(
103103
fun () ->
104104
Cached = get_in_mnesia_tx(XName),
105105
insert_in_mnesia(XName, Cached, Message, Length)
@@ -167,7 +167,7 @@ delete(XName) ->
167167
}).
168168

169169
delete_in_mnesia(XName) ->
170-
rabbit_misc:execute_mnesia_transaction(
170+
rabbit_mnesia:execute_mnesia_transaction(
171171
fun() ->
172172
mnesia:delete(?RH_TABLE, XName, write)
173173
end).
@@ -225,4 +225,3 @@ khepri_recent_history_path() ->
225225

226226
khepri_recent_history_path(#resource{virtual_host = VHost, name = Name}) ->
227227
[?MODULE, recent_history_exchange, VHost, Name].
228-
>>>>>>> 7b61c2e402 (Khepri: The One Commit)

0 commit comments

Comments
 (0)