Skip to content

Make pg_local:member_died/2 more resilient #1700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 45 additions & 27 deletions src/pg_local.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
-module(pg_local).

-export([join/2, leave/2, get_members/1, in_group/2]).
-export([sync/0]). %% intended for testing only; not part of official API
%% intended for testing only; not part of official API
-export([sync/0, clear/0]).
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2]).

Expand All @@ -54,7 +55,7 @@

%%----------------------------------------------------------------------------

%%% As of R13B03 monitors are used instead of links.
-define(TABLE, pg_local_table).

%%%
%%% Exported functions
Expand Down Expand Up @@ -92,19 +93,27 @@ sync() ->
_ = ensure_started(),
gen_server:call(?MODULE, sync, infinity).

clear() ->
_ = ensure_started(),
gen_server:call(?MODULE, clear, infinity).

%%%
%%% Callback functions from gen_server
%%%

-record(state, {}).

init([]) ->
pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
{ok, #state{}}.

handle_call(sync, _From, S) ->
{reply, ok, S};

handle_call(clear, _From, S) ->
ets:delete_all_objects(?TABLE),
{reply, ok, S};

handle_call(Request, From, S) ->
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
Expand All @@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) ->
handle_cast(_, S) ->
{noreply, S}.

handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
member_died(MonitorRef),
handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
member_died(MonitorRef, Pid),
{noreply, S};
handle_info(_, S) ->
{noreply, S}.

terminate(_Reason, _S) ->
true = ets:delete(pg_local_table),
true = ets:delete(?TABLE),
ok.

%%%
Expand All @@ -148,46 +157,55 @@ terminate(_Reason, _S) ->
%%% {{pid, Pid, Name}}
%%% Pid is a member of group Name.

member_died(Ref) ->
[{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
member_died(Ref, Pid) ->
case ets:lookup(?TABLE, {ref, Ref}) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this lookup if we always call leave_all_groups with the same Pid? The three case clauses do exactly the same. Or am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is many years old so I can only guess the intent here. I think it tries to assert that the ref/pid combination is known. I'd leave it as is for now, this is not a hot code path event.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clauses simply demonstrate what 3 scenarios are possible. We can combine the latter two.

[{{ref, Ref}, Pid}] ->
leave_all_groups(Pid);
%% in case the key has already been removed
%% we can clean up using the value from the DOWN message
_ ->
leave_all_groups(Pid)
end,
ok.

leave_all_groups(Pid) ->
Names = member_groups(Pid),
_ = [leave_group(Name, P) ||
Name <- Names,
P <- member_in_group(Pid, Name)],
ok.
P <- member_in_group(Pid, Name)].

join_group(Name, Pid) ->
Ref_Pid = {ref, Pid},
try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
catch _:_ ->
Ref = erlang:monitor(process, Pid),
true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
true = ets:insert(?TABLE, {{ref, Ref}, Pid})
end,
Member_Name_Pid = {member, Name, Pid},
try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
catch _:_ ->
true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
true = ets:insert(pg_local_table, {{pid, Pid, Name}})
true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
true = ets:insert(?TABLE, {{pid, Pid, Name}})
end.

leave_group(Name, Pid) ->
Member_Name_Pid = {member, Name, Pid},
try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
N ->
if
N =:= 0 ->
true = ets:delete(pg_local_table, {pid, Pid, Name}),
true = ets:delete(pg_local_table, Member_Name_Pid);
true = ets:delete(?TABLE, {pid, Pid, Name}),
true = ets:delete(?TABLE, Member_Name_Pid);
true ->
ok
end,
Ref_Pid = {ref, Pid},
case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
0 ->
[{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
true = ets:delete(pg_local_table, {ref, Ref}),
true = ets:delete(pg_local_table, Ref_Pid),
[{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
true = ets:delete(?TABLE, {ref, Ref}),
true = ets:delete(?TABLE, Ref_Pid),
true = erlang:demonitor(Ref, [flush]),
ok;
_ ->
Expand All @@ -199,21 +217,21 @@ leave_group(Name, Pid) ->

group_members(Name) ->
[P ||
[P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
[P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].

member_in_group(Pid, Name) ->
[{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
[{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
lists:duplicate(N, Pid).

member_present(Name, Pid) ->
case ets:lookup(pg_local_table, {member, Name, Pid}) of
case ets:lookup(?TABLE, {member, Name, Pid}) of
[_] -> true;
[] -> false
end.

member_groups(Pid) ->
[Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
[Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].

ensure_started() ->
case whereis(?MODULE) of
Expand Down
61 changes: 57 additions & 4 deletions test/unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-module(unit_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").

Expand Down Expand Up @@ -44,7 +45,6 @@ groups() ->
decrypt_config,
listing_plugins_from_multiple_directories,
rabbitmqctl_encode,
pg_local,
pmerge,
plmerge,
priority_queue,
Expand All @@ -68,6 +68,9 @@ groups() ->
]}
]},
{sequential_tests, [], [
pg_local,
pg_local_with_unexpected_deaths1,
pg_local_with_unexpected_deaths2,
decrypt_start_app,
decrypt_start_app_file,
decrypt_start_app_undefined,
Expand Down Expand Up @@ -377,29 +380,79 @@ rabbit_direct_extract_extra_auth_props(_Config) ->
%% -------------------------------------------------------------------

pg_local(_Config) ->
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
[P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
check_pg_local(ok, [], []),
%% P joins group a, then b, then a again
check_pg_local(pg_local:join(a, P), [P], []),
check_pg_local(pg_local:join(b, P), [P], [P]),
check_pg_local(pg_local:join(a, P), [P, P], [P]),
%% Q joins group a, then b, then b again
check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
%% P leaves groups a and a
check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
%% leave/2 is idempotent
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
%% clean up all processes
[begin X ! done,
Ref = erlang:monitor(process, X),
receive {'DOWN', Ref, process, X, _Info} -> ok end
end || X <- [P, Q]],
%% ensure the groups are empty
check_pg_local(ok, [], []),
passed.

pg_local_with_unexpected_deaths1(_Config) ->
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
check_pg_local(ok, [], []),
%% A joins groups a and b
check_pg_local(pg_local:join(a, A), [A], []),
check_pg_local(pg_local:join(b, A), [A], [A]),
%% B joins group b
check_pg_local(pg_local:join(b, B), [A], [A, B]),

[begin erlang:exit(X, sleep_now_in_a_fire),
Ref = erlang:monitor(process, X),
receive {'DOWN', Ref, process, X, _Info} -> ok end
end || X <- [A, B]],
%% ensure the groups are empty
check_pg_local(ok, [], []),
?assertNot(erlang:is_process_alive(A)),
?assertNot(erlang:is_process_alive(B)),

passed.

pg_local_with_unexpected_deaths2(_Config) ->
[A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
check_pg_local(ok, [], []),
%% A joins groups a and b
check_pg_local(pg_local:join(a, A), [A], []),
check_pg_local(pg_local:join(b, A), [A], [A]),
%% B joins group b
check_pg_local(pg_local:join(b, B), [A], [A, B]),

%% something else yanks a record (or all of them) from the pg_local
%% bookkeeping table
ok = pg_local:clear(),

[begin erlang:exit(X, sleep_now_in_a_fire),
Ref = erlang:monitor(process, X),
receive {'DOWN', Ref, process, X, _Info} -> ok end
end || X <- [A, B]],
%% ensure the groups are empty
check_pg_local(ok, [], []),
?assertNot(erlang:is_process_alive(A)),
?assertNot(erlang:is_process_alive(B)),

passed.

check_pg_local(ok, APids, BPids) ->
ok = pg_local:sync(),
[true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
{Key, Pids} <- [{a, APids}, {b, BPids}]].
?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
{Key, Pids} <- [{a, APids}, {b, BPids}]]).

%% -------------------------------------------------------------------
%% priority_queue.
Expand Down