Skip to content

Commit d5d8689

Browse files
FHC: use safe versions of ets update functions
Otherwise, if the FHC runs into an exception because of an ETS write failure, most code paths begin failing. This may help with the condition in #8784.
1 parent c4e459e commit d5d8689

File tree

4 files changed

+241
-28
lines changed

4 files changed

+241
-28
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,16 +1231,10 @@ contains_message(MsgId, From, State) ->
12311231
gen_server2:reply(From, MsgLocation =/= not_found),
12321232
State.
12331233

1234-
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
1235-
try
1236-
SuccessFun(ets:update_counter(Tab, Key, UpdateOp))
1237-
catch error:badarg -> FailThunk()
1238-
end.
1239-
12401234
update_msg_cache(CacheEts, MsgId, Msg) ->
12411235
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
12421236
true -> ok;
1243-
false -> safe_ets_update_counter(
1237+
false -> rabbit_misc:safe_ets_update_counter(
12441238
CacheEts, MsgId, {3, +1}, fun (_) -> ok end,
12451239
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
12461240
end.

deps/rabbit_common/src/file_handle_cache.erl

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,8 @@
147147
-export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2,
148148
handle_info/2, terminate/2, code_change/3, prioritise_cast/3]).
149149

150+
-export([clear_metrics_of/1, list_elders/0, list_clients/0, get_client_state/1]).
151+
150152
-define(SERVER, ?MODULE).
151153
%% Reserve 3 handles for ra usage: wal, segment writer and a dets table
152154
-define(RESERVED_FOR_OTHERS, 100 + 3).
@@ -158,6 +160,9 @@
158160
-define(CLIENT_ETS_TABLE, file_handle_cache_client).
159161
-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
160162

163+
-import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4,
164+
safe_ets_update_element/3, safe_ets_update_element/4]).
165+
161166
%%----------------------------------------------------------------------------
162167

163168
-record(file,
@@ -622,6 +627,34 @@ clear_process_read_cache() ->
622627
size(Handle#handle.read_buffer) > 0
623628
].
624629

630+
%% Only used for testing
631+
clear_metrics_of(Pid) ->
632+
case whereis(?SERVER) of
633+
undefined -> ok;
634+
_ -> gen_server2:cast(?SERVER, {clear_metrics_of, Pid})
635+
end.
636+
637+
%% Only used for testing
638+
list_elders() ->
639+
case whereis(?SERVER) of
640+
undefined -> ok;
641+
_ -> gen_server2:call(?SERVER, list_elders)
642+
end.
643+
644+
%% Only used for testing
645+
list_clients() ->
646+
case whereis(?SERVER) of
647+
undefined -> ok;
648+
_ -> gen_server2:call(?SERVER, list_clients)
649+
end.
650+
651+
%% Only used for testing
652+
get_client_state(Pid) ->
653+
case whereis(?SERVER) of
654+
undefined -> ok;
655+
_ -> gen_server2:call(?SERVER, {get_client_state, Pid})
656+
end.
657+
625658
%%----------------------------------------------------------------------------
626659
%% Internal functions
627660
%%----------------------------------------------------------------------------
@@ -1125,13 +1158,13 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
11251158
case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
11261159
true -> case ets:lookup(Clients, Pid) of
11271160
[#cstate { opened = 0 }] ->
1128-
true = ets:update_element(
1161+
safe_ets_update_element(
11291162
Clients, Pid, {#cstate.blocked, true}),
11301163
{noreply,
11311164
reduce(State #fhc_state {
11321165
open_pending = pending_in(Item, Pending) })};
11331166
[#cstate { opened = Opened }] ->
1134-
true = ets:update_element(
1167+
_ = safe_ets_update_element(
11351168
Clients, Pid,
11361169
{#cstate.pending_closes, Opened}),
11371170
{reply, close, State}
@@ -1147,7 +1180,7 @@ handle_call({obtain, N, Type, Pid}, From,
11471180
Item = #pending { kind = {obtain, Type}, pid = Pid,
11481181
requested = N, from = From },
11491182
Enqueue = fun () ->
1150-
true = ets:update_element(Clients, Pid,
1183+
_ = safe_ets_update_element(Clients, Pid,
11511184
{#cstate.blocked, true}),
11521185
set_obtain_state(Type, pending,
11531186
pending_in(Item, Pending), State)
@@ -1175,12 +1208,21 @@ handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
11751208
{reply, Limit, State};
11761209

11771210
handle_call({info, Items}, _From, State) ->
1178-
{reply, infos(Items, State), State}.
1211+
{reply, infos(Items, State), State};
1212+
1213+
handle_call(list_elders, _From, State = #fhc_state { elders = Elders }) ->
1214+
{reply, ets:tab2list(Elders), State};
1215+
1216+
handle_call(list_clients, _From, State = #fhc_state { clients = Clients }) ->
1217+
{reply, ets:tab2list(Clients), State};
1218+
1219+
handle_call({get_client_state, ID}, _From, State = #fhc_state { clients = Clients }) ->
1220+
{reply, ets:lookup(Clients, ID), State}.
11791221

11801222
handle_cast({register_callback, Pid, MFA},
11811223
State = #fhc_state { clients = Clients }) ->
11821224
ok = track_client(Pid, Clients),
1183-
true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}),
1225+
_ = safe_ets_update_element(Clients, Pid, {#cstate.callback, MFA}),
11841226
{noreply, State};
11851227

11861228
handle_cast({update, Pid, EldestUnusedSince},
@@ -1201,7 +1243,7 @@ handle_cast({close, Pid, EldestUnusedSince},
12011243
undefined -> ets:delete(Elders, Pid);
12021244
_ -> ets:insert(Elders, {Pid, EldestUnusedSince})
12031245
end,
1204-
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
1246+
safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
12051247
{noreply, adjust_alarm(State, process_pending(
12061248
update_counts(open, Pid, -1, State)))};
12071249

@@ -1227,7 +1269,15 @@ handle_cast({set_reservation, N, Type, Pid},
12271269
{noreply, case needs_reduce(NewState) of
12281270
true -> reduce(NewState);
12291271
false -> adjust_alarm(State, NewState)
1230-
end}.
1272+
end};
1273+
1274+
handle_cast({clear_metrics_of, Pid},
1275+
State = #fhc_state { elders = Elders, clients = Clients }) ->
1276+
ets:delete(Elders, Pid),
1277+
ets:delete(Clients, Pid),
1278+
safe_ets_update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
1279+
{noreply, adjust_alarm(State, process_pending(
1280+
update_counts(open, Pid, -1, State)))}.
12311281

12321282
handle_info(check_counts, State) ->
12331283
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
@@ -1400,37 +1450,42 @@ run_pending_item(#pending { kind = Kind,
14001450
from = From },
14011451
State = #fhc_state { clients = Clients }) ->
14021452
gen_server2:reply(From, ok),
1403-
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
1453+
safe_ets_update_element(Clients, Pid, {#cstate.blocked, false}),
14041454
update_counts(Kind, Pid, Requested, State).
14051455

14061456
update_counts(open, Pid, Delta,
14071457
State = #fhc_state { open_count = OpenCount,
14081458
clients = Clients }) ->
1409-
ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
1459+
safe_ets_update_counter(Clients, Pid, {#cstate.opened, Delta},
1460+
fun() -> rabbit_log:warning("FHC: failed to update counter 'opened', client pid: ~p", [Pid]) end),
14101461
State #fhc_state { open_count = OpenCount + Delta};
14111462
update_counts({obtain, file}, Pid, Delta,
14121463
State = #fhc_state {obtain_count_file = ObtainCountF,
14131464
clients = Clients }) ->
1414-
ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
1465+
safe_ets_update_counter(Clients, Pid, {#cstate.obtained_file, Delta},
1466+
fun() -> rabbit_log:warning("FHC: failed to update counter 'obtained_file', client pid: ~p", [Pid]) end),
14151467
State #fhc_state { obtain_count_file = ObtainCountF + Delta};
14161468
update_counts({obtain, socket}, Pid, Delta,
14171469
State = #fhc_state {obtain_count_socket = ObtainCountS,
14181470
clients = Clients }) ->
1419-
ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
1471+
safe_ets_update_counter(Clients, Pid, {#cstate.obtained_socket, Delta},
1472+
fun() -> rabbit_log:warning("FHC: failed to update counter 'obtained_socket', client pid: ~p", [Pid]) end),
14201473
State #fhc_state { obtain_count_socket = ObtainCountS + Delta};
14211474
update_counts({reserve, file}, Pid, NewReservation,
14221475
State = #fhc_state {reserve_count_file = ReserveCountF,
14231476
clients = Clients }) ->
14241477
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
14251478
Delta = NewReservation - R,
1426-
ets:update_counter(Clients, Pid, {#cstate.reserved_file, Delta}),
1479+
safe_ets_update_counter(Clients, Pid, {#cstate.reserved_file, Delta},
1480+
fun() -> rabbit_log:warning("FHC: failed to update counter 'reserved_file', client pid: ~p", [Pid]) end),
14271481
State #fhc_state { reserve_count_file = ReserveCountF + Delta};
14281482
update_counts({reserve, socket}, Pid, NewReservation,
14291483
State = #fhc_state {reserve_count_socket = ReserveCountS,
14301484
clients = Clients }) ->
14311485
[#cstate{reserved_file = R}] = ets:lookup(Clients, Pid),
14321486
Delta = NewReservation - R,
1433-
ets:update_counter(Clients, Pid, {#cstate.reserved_socket, Delta}),
1487+
safe_ets_update_counter(Clients, Pid, {#cstate.reserved_socket, Delta},
1488+
fun() -> rabbit_log:warning("FHC: failed to update counter 'reserved_socket', client pid: ~p", [Pid]) end),
14341489
State #fhc_state { reserve_count_socket = ReserveCountS + Delta}.
14351490

14361491
maybe_reduce(State) ->
@@ -1521,7 +1576,7 @@ notify(Clients, Required, [#cstate{ pid = Pid,
15211576
callback = {M, F, A},
15221577
opened = Opened } | Notifications]) ->
15231578
apply(M, F, A ++ [0]),
1524-
ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
1579+
safe_ets_update_element(Clients, Pid, {#cstate.pending_closes, Opened}),
15251580
notify(Clients, Required - Opened, Notifications).
15261581

15271582
track_client(Pid, Clients) ->

deps/rabbit_common/src/file_handle_cache_stats.erl

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
-define(COUNT_TIME, [io_sync, io_seek]).
2121
-define(COUNT_TIME_BYTES, [io_read, io_write]).
2222

23+
-import(rabbit_misc, [safe_ets_update_counter/3, safe_ets_update_counter/4]).
24+
2325
init() ->
2426
_ = ets:new(?TABLE, [public, named_table, {write_concurrency,true}]),
2527
[ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES,
@@ -31,23 +33,23 @@ init() ->
3133

3234
update(Op, Bytes, Thunk) ->
3335
{Time, Res} = timer_tc(Thunk),
34-
_ = ets:update_counter(?TABLE, {Op, count}, 1),
35-
_ = ets:update_counter(?TABLE, {Op, bytes}, Bytes),
36-
_ = ets:update_counter(?TABLE, {Op, time}, Time),
36+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
37+
_ = safe_ets_update_counter(?TABLE, {Op, bytes}, Bytes),
38+
_ = safe_ets_update_counter(?TABLE, {Op, time}, Time),
3739
Res.
3840

3941
update(Op, Thunk) ->
4042
{Time, Res} = timer_tc(Thunk),
41-
_ = ets:update_counter(?TABLE, {Op, count}, 1),
42-
_ = ets:update_counter(?TABLE, {Op, time}, Time),
43+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
44+
_ = safe_ets_update_counter(?TABLE, {Op, time}, Time),
4345
Res.
4446

4547
update(Op) ->
46-
ets:update_counter(?TABLE, {Op, count}, 1),
48+
_ = safe_ets_update_counter(?TABLE, {Op, count}, 1),
4749
ok.
4850

4951
inc(Op, Count) ->
50-
_ = ets:update_counter(?TABLE, {Op, count}, Count),
52+
_ = safe_ets_update_counter(?TABLE, {Op, count}, Count),
5153
ok.
5254

5355
get() ->

0 commit comments

Comments
 (0)