Skip to content

Commit 601e981

Browse files
Merge pull request #8790 from rabbitmq/rabbitmq-server-8784
FHC: use exception-safe versions of ets update functions
2 parents c4e459e + 9bd0697 commit 601e981

File tree

11 files changed

+349
-43
lines changed

11 files changed

+349
-43
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,16 +1231,10 @@ contains_message(MsgId, From, State) ->
12311231
gen_server2:reply(From, MsgLocation =/= not_found),
12321232
State.
12331233

1234-
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
1235-
try
1236-
SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
1237-
catch error:badarg -> FailThunk()
1238-
end.
1239-
12401234
update_msg_cache(CacheEts, MsgId, Msg) ->
12411235
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
12421236
true -> ok;
1243-
false -> safe_ets_update_counter(
1237+
false -> rabbit_misc:safe_ets_update_counter(
12441238
CacheEts, MsgId, {3, +1}, fun (_) -> ok end,
12451239
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
12461240
end.

deps/rabbit_common/src/file_handle_cache.erl

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@
147147
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
148148
handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
149149

150+
-export([clear_metrics_of/1, list_elders/0, list_clients/0, get_client_state/1]).
151+
150152
-define(SERVER, ?MODULE).
151153
%% Reserve 3 handles for ra usage: wal, segment writer and a dets table
152154
-define(RESERVED_FOR_OTHERS, 100 + 3).
@@ -158,6 +160,9 @@
158160
-define(CLIENT_ETS_TABLE, file_handle_cache_client).
159161
-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
160162

163+
-import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4,
164+
safe_ets_update_element/3, safe_ets_update_element/4]).
165+
161166
%%----------------------------------------------------------------------------
162167

163168
-record(file,
@@ -622,6 +627,34 @@ clear_process_read_cache() ->
622627
size(Handle#handle.read_buffer) > 0
623628
].
624629

630+
%% Only used for testing
631+
clear_metrics_of(Pid) ->
632+
case whereis(?SERVER) of
633+
undefined -> ok;
634+
_ -> gen_server2:cast(?SERVER, {clear_metrics_of, Pid})
635+
end.
636+
637+
%% Only used for testing
638+
list_elders() ->
639+
case whereis(?SERVER) of
640+
undefined -> ok;
641+
_ -> gen_server2:call(?SERVER, list_elders)
642+
end.
643+
644+
%% Only used for testing
645+
list_clients() ->
646+
case whereis(?SERVER) of
647+
undefined -> ok;
648+
_ -> gen_server2:call(?SERVER, list_clients)
649+
end.
650+
651+
%% Only used for testing
652+
get_client_state(Pid) ->
653+
case whereis(?SERVER) of
654+
undefined -> ok;
655+
_ -> gen_server2:call(?SERVER, {get_client_state, Pid})
656+
end.
657+
625658
%%----------------------------------------------------------------------------
626659
%% Internal functions
627660
%%----------------------------------------------------------------------------
@@ -1125,13 +1158,13 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
11251158
case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
11261159
true -> case ets:lookup(Clients, Pid) of
11271160
[#cstate { opened = 0 }] ->
1128-
true = ets:update_element(
1161+
_ = safe_ets_update_element(
11291162
Clients, Pid, {#cstate.blocked, true}),
11301163
{noreply,
11311164
reduce(State #fhc_state {
11321165
open_pending = pending_in(Item, Pending) })};
11331166
[#cstate { opened = Opened }] ->
1134-
true = ets:update_element(
1167+
_ = safe_ets_update_element(
11351168
Clients, Pid,
11361169
{#cstate.pending_closes, Opened}),
11371170
{reply, close, State}
@@ -1147,7 +1180,7 @@ handle_call({obtain, N, Type, Pid}, From,
11471180
Item = #pending { kind = {obtain, Type}, pid = Pid,
11481181
requested = N, from = From },
11491182
Enqueue = fun () ->
1150-
true = ets:update_element(Clients, Pid,
1183+
_ = safe_ets_update_element(Clients, Pid,
11511184
{#cstate.blocked, true}),
11521185
set_obtain_state(Type, pending,
11531186
pending_in(Item, Pending), State)
@@ -1175,12 +1208,21 @@ handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
11751208
{reply, Limit, State};
11761209

11771210
handle_call({info, Items}, _From, State) ->
1178-
{reply, infos(Items, State), State}.
1211+
{reply, infos(Items, State), State};
1212+
1213+
handle_call(list_elders, _From, State = #fhc_state { elders = Elders }) ->
1214+
{reply, ets:tab2list(Elders), State};
1215+
1216+
handle_call(list_clients, _From, State = #fhc_state { clients = Clients }) ->
1217+
{reply, ets:tab2list(Clients), State};
1218+
1219+
handle_call({get_client_state, ID}, _From, State = #fhc_state { clients = Clients }) ->
1220+
{reply, ets:lookup(Clients, ID), State}.
11791221

11801222
handle_cast({register_callback, Pid, MFA},
11811223
State = #fhc_state { clients = Clients }) ->
11821224
ok = track_client(Pid, Clients),
1183-
true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
1225+
_ = safe_ets_update_element(Clients, Pid, {#cstate.callback, MFA}),
11841226
{noreply, State};
11851227

11861228
handle_cast({update, Pid, EldestUnusedSince},
@@ -1201,7 +1243,7 @@ handle_cast({close, Pid, EldestUnusedSince},
12011243
undefined -> ets:delete(Elders, Pid);
12021244
_ -> ets:insert(Elders, {Pid, EldestUnusedSince})
12031245
end,
1204-
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
1246+
safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
12051247
{noreply, adjust_alarm(State, process_pending(
12061248
update_counts(open, Pid, -1, State)))};
12071249

@@ -1227,7 +1269,15 @@ handle_cast({set_reservation, N, Type, Pid},
12271269
{noreply, case needs_reduce(NewState) of
12281270
true -> reduce(NewState);
12291271
false -> adjust_alarm(State, NewState)
1230-
end}.
1272+
end};
1273+
1274+
handle_cast({clear_metrics_of, Pid},
1275+
State = #fhc_state { elders = Elders, clients = Clients }) ->
1276+
ets:delete(Elders, Pid),
1277+
ets:delete(Clients, Pid),
1278+
safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
1279+
{noreply, adjust_alarm(State, process_pending(
1280+
update_counts(open, Pid, -1, State)))}.
12311281

12321282
handle_info(check_counts, State) ->
12331283
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
@@ -1400,37 +1450,42 @@ run_pending_item(#pending { kind = Kind,
14001450
from = From },
14011451
State = #fhc_state { clients = Clients }) ->
14021452
gen_server2:reply(From, ok),
1403-
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
1453+
safe_ets_update_element(Clients, Pid, {#cstate.blocked, false}),
14041454
update_counts(Kind, Pid, Requested, State).
14051455

14061456
update_counts(open, Pid, Delta,
14071457
State = #fhc_state { open_count = OpenCount,
14081458
clients = Clients }) ->
1409-
ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
1459+
safe_ets_update_counter(Clients, Pid, {#cstate.opened, Delta},
1460+
fun() -> rabbit_log:warning("FHC: failed to update counter 'opened', client pid: ~p", [Pid]) end),
14101461
State #fhc_state { open_count = OpenCount + Delta};
14111462
update_counts({obtain, file}, Pid, Delta,
14121463
State = #fhc_state {obtain_count_file = ObtainCountF,
14131464
clients = Clients }) ->
1414-
ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
1465+
safe_ets_update_counter(Clients, Pid, {#cstate.obtained_file, Delta},
1466+
fun() -> rabbit_log:warning("FHC: failed to update counter 'obtained_file', client pid: ~p", [Pid]) end),
14151467
State #fhc_state { obtain_count_file = ObtainCountF + Delta};
14161468
update_counts({obtain, socket}, Pid, Delta,
14171469
State = #fhc_state {obtain_count_socket = ObtainCountS,
14181470
clients = Clients }) ->
1419-
ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
1471+
safe_ets_update_counter(Clients, Pid, {#cstate.obtained_socket, Delta},
1472+
fun() -> rabbit_log:warning("FHC: failed to update counter 'obtained_socket', client pid: ~p", [Pid]) end),
14201473
State #fhc_state { obtain_count_socket = ObtainCountS + Delta};
14211474
update_counts({reserve, file}, Pid, NewReservation,
14221475
State = #fhc_state {reserve_count_file = ReserveCountF,
14231476
clients = Clients }) ->
14241477
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
14251478
Delta = NewReservation - R,
1426-
ets:update_counter(Clients, Pid, {#cstate.reserved_file, Delta}),
1479+
safe_ets_update_counter(Clients, Pid, {#cstate.reserved_file, Delta},
1480+
fun() -> rabbit_log:warning("FHC: failed to update counter 'reserved_file', client pid: ~p", [Pid]) end),
14271481
State #fhc_state { reserve_count_file = ReserveCountF + Delta};
14281482
update_counts({reserve, socket}, Pid, NewReservation,
14291483
State = #fhc_state {reserve_count_socket = ReserveCountS,
14301484
clients = Clients }) ->
14311485
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
14321486
Delta = NewReservation - R,
1433-
ets:update_counter(Clients, Pid, {#cstate.reserved_socket, Delta}),
1487+
safe_ets_update_counter(Clients, Pid, {#cstate.reserved_socket, Delta},
1488+
fun() -> rabbit_log:warning("FHC: failed to update counter 'reserved_socket', client pid: ~p", [Pid]) end),
14341489
State #fhc_state { reserve_count_socket = ReserveCountS + Delta}.
14351490

14361491
maybe_reduce(State) ->
@@ -1521,7 +1576,7 @@ notify(Clients, Required, [#cstate{ pid = Pid,
15211576
callback = {M, F, A},
15221577
opened = Opened } | Notifications]) ->
15231578
apply(M, F, A ++ [0]),
1524-
ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
1579+
safe_ets_update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
15251580
notify(Clients, Required - Opened, Notifications).
15261581

15271582
track_client(Pid, Clients) ->

deps/rabbit_common/src/file_handle_cache_stats.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
-define(COUNT_TIME, [io_sync, io_seek]).
2121
-define(COUNT_TIME_BYTES, [io_read, io_write]).
2222

23+
-import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4]).
24+
2325
init() ->
2426
_ = ets:new(?TABLE, [public, named_table, {write_concurrency,true}]),
2527
[ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES,
@@ -31,23 +33,23 @@ init() ->
3133

3234
update(Op, Bytes, Thunk) ->
3335
{Time, Res} = timer_tc(Thunk),
34-
_ = ets:update_counter(?TABLE, {Op, count}, 1),
35-
_ = ets:update_counter(?TABLE, {Op, bytes}, Bytes),
36-
_ = ets:update_counter(?TABLE, {Op, time}, Time),
36+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
37+
_ = safe_ets_update_counter(?TABLE, {Op, bytes}, Bytes),
38+
_ = safe_ets_update_counter(?TABLE, {Op, time}, Time),
3739
Res.
3840

3941
update(Op, Thunk) ->
4042
{Time, Res} = timer_tc(Thunk),
41-
_ = ets:update_counter(?TABLE, {Op, count}, 1),
42-
_ = ets:update_counter(?TABLE, {Op, time}, Time),
43+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
44+
_ = safe_ets_update_counter(?TABLE, {Op, time}, Time),
4345
Res.
4446

4547
update(Op) ->
46-
ets:update_counter(?TABLE, {Op, count}, 1),
48+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
4749
ok.
4850

4951
inc(Op, Count) ->
50-
_ = ets:update_counter(?TABLE, {Op, count}, Count),
52+
_ = safe_ets_update_counter(?TABLE, {Op, count}, Count),
5153
ok.
5254

5355
get() ->

0 commit comments

Comments
 (0)